Skip to content

Commit

Permalink
[Rollup] Improve ID scheme for rollup documents (#32558)
Browse files Browse the repository at this point in the history
Previously, we were using a simple CRC32 for the IDs of rollup documents.
This is a very poor choice however, since 32bit IDs leads to collisions
between documents very quickly.

This commit moves Rollups over to a 128bit ID.  The ID is a concatenation
of all the keys in the document (similar to the rolling CRC before),
hashed with 128bit Murmur3, then base64 encoded.  Finally, the job
ID and a delimiter (`$`) are prepended to the ID.

This gurantees that there are 128bits per-job.  128bits should
essentially remove all chances of collisions, and the prepended
job ID means that _if_ there is a collision, it stays "within"
the job.

BWC notes:

We can only upgrade the ID scheme after we know there has been a good
checkpoint during indexing.  We don't rely on a STARTED/STOPPED
status since we can't guarantee that resulted from a real checkpoint,
or other state.  So we only upgrade the ID after we have reached
a checkpoint state during an active index run, and only after the
checkpoint has been confirmed.

Once a job has been upgraded and checkpointed, the version increments
and the new ID is used in the future.  All new jobs use the
new ID from the start
  • Loading branch information
polyfractal authored Aug 3, 2018
1 parent 3d4c84f commit fc9fb64
Show file tree
Hide file tree
Showing 21 changed files with 1,054 additions and 170 deletions.
9 changes: 6 additions & 3 deletions x-pack/docs/en/rest-api/rollup/get-job.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ Which will yield the following response:
"page_size" : 1000
},
"status" : {
"job_state" : "stopped"
"job_state" : "stopped",
"upgraded_doc_id": true
},
"stats" : {
"pages_processed" : 0,
Expand Down Expand Up @@ -212,7 +213,8 @@ Which will yield the following response:
"page_size" : 1000
},
"status" : {
"job_state" : "stopped"
"job_state" : "stopped",
"upgraded_doc_id": true
},
"stats" : {
"pages_processed" : 0,
Expand Down Expand Up @@ -260,7 +262,8 @@ Which will yield the following response:
"page_size" : 1000
},
"status" : {
"job_state" : "stopped"
"job_state" : "stopped",
"upgraded_doc_id": true
},
"stats" : {
"pages_processed" : 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.rollup.job;


import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -39,12 +40,19 @@ public class RollupJobStatus implements Task.Status, PersistentTaskState {
@Nullable
private final TreeMap<String, Object> currentPosition;

// Flag holds the state of the ID scheme, e.g. if it has been upgraded to the
// concatenation scheme. See #32372 for more details
private boolean upgradedDocumentID;

private static final ParseField STATE = new ParseField("job_state");
private static final ParseField CURRENT_POSITION = new ParseField("current_position");
private static final ParseField UPGRADED_DOC_ID = new ParseField("upgraded_doc_id");

public static final ConstructingObjectParser<RollupJobStatus, Void> PARSER =
new ConstructingObjectParser<>(NAME,
args -> new RollupJobStatus((IndexerState) args[0], (HashMap<String, Object>) args[1]));
args -> new RollupJobStatus((IndexerState) args[0],
(HashMap<String, Object>) args[1],
(Boolean)args[2]));

static {
PARSER.declareField(constructorArg(), p -> {
Expand All @@ -62,16 +70,28 @@ public class RollupJobStatus implements Task.Status, PersistentTaskState {
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);

// Optional to accommodate old versions of state
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), UPGRADED_DOC_ID);
}

public RollupJobStatus(IndexerState state, @Nullable Map<String, Object> position) {
public RollupJobStatus(IndexerState state, @Nullable Map<String, Object> position,
@Nullable Boolean upgradedDocumentID) {
this.state = state;
this.currentPosition = position == null ? null : new TreeMap<>(position);
this.upgradedDocumentID = upgradedDocumentID != null ? upgradedDocumentID : false; //default to false if missing
}

public RollupJobStatus(StreamInput in) throws IOException {
state = IndexerState.fromStream(in);
currentPosition = in.readBoolean() ? new TreeMap<>(in.readMap()) : null;
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { //TODO change this after backport
upgradedDocumentID = in.readBoolean();
} else {
// If we're getting this job from a pre-6.4.0 node,
// it is using the old ID scheme
upgradedDocumentID = false;
}
}

public IndexerState getIndexerState() {
Expand All @@ -82,6 +102,10 @@ public Map<String, Object> getPosition() {
return currentPosition;
}

public boolean isUpgradedDocumentID() {
return upgradedDocumentID;
}

public static RollupJobStatus fromXContent(XContentParser parser) {
try {
return PARSER.parse(parser, null);
Expand All @@ -97,6 +121,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (currentPosition != null) {
builder.field(CURRENT_POSITION.getPreferredName(), currentPosition);
}
builder.field(UPGRADED_DOC_ID.getPreferredName(), upgradedDocumentID);
builder.endObject();
return builder;
}
Expand All @@ -113,6 +138,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (currentPosition != null) {
out.writeMap(currentPosition);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { //TODO change this after backport
out.writeBoolean(upgradedDocumentID);
}
}

@Override
Expand All @@ -128,11 +156,12 @@ public boolean equals(Object other) {
RollupJobStatus that = (RollupJobStatus) other;

return Objects.equals(this.state, that.state)
&& Objects.equals(this.currentPosition, that.currentPosition);
&& Objects.equals(this.currentPosition, that.currentPosition)
&& Objects.equals(this.upgradedDocumentID, that.upgradedDocumentID);
}

@Override
public int hashCode() {
return Objects.hash(state, currentPosition);
return Objects.hash(state, currentPosition, upgradedDocumentID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ protected GetRollupJobsAction.JobWrapper createTestInstance() {

return new GetRollupJobsAction.JobWrapper(ConfigTestHelpers.getRollupJob(randomAlphaOfLength(5)).build(),
new RollupJobStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()),
new RollupJobStatus(state, Collections.emptyMap()));
new RollupJobStatus(state, Collections.emptyMap(), randomBoolean()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.rollup.job.IndexerState;
import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -35,7 +33,7 @@ private Map<String, Object> randomPosition() {

@Override
protected RollupJobStatus createTestInstance() {
return new RollupJobStatus(randomFrom(IndexerState.values()), randomPosition());
return new RollupJobStatus(randomFrom(IndexerState.values()), randomPosition(), randomBoolean());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,14 @@
public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin {

public static final String BASE_PATH = "/_xpack/rollup/";
public static final int ROLLUP_VERSION = 1;

// Introduced in ES version 6.3
public static final int ROLLUP_VERSION_V1 = 1;
// Introduced in ES Version 6.4
// Bumped due to ID collision, see #32372
public static final int ROLLUP_VERSION_V2 = 2;
public static final int CURRENT_ROLLUP_VERSION = ROLLUP_VERSION_V2;

public static final String TASK_THREAD_POOL_NAME = RollupField.NAME + "_indexing";
public static final String SCHEDULE_THREAD_POOL_NAME = RollupField.NAME + "_scheduler";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ static MultiSearchRequest createMSearchRequest(SearchRequest request, NamedWrite
copiedSource.query(new BoolQueryBuilder()
.must(rewritten)
.filter(new TermQueryBuilder(RollupField.formatMetaField(RollupField.ID.getPreferredName()), id))
.filter(new TermQueryBuilder(RollupField.formatMetaField(RollupField.VERSION_FIELD), Rollup.ROLLUP_VERSION)));
// Both versions are acceptable right now since they are compatible at search time
.filter(new TermsQueryBuilder(RollupField.formatMetaField(RollupField.VERSION_FIELD),
new long[]{Rollup.ROLLUP_VERSION_V1, Rollup.ROLLUP_VERSION_V2})));

// And add a new msearch per JobID
msearch.add(new SearchRequest(context.getRollupIndices(), copiedSource).types(request.types()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.apache.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
Expand All @@ -21,14 +20,12 @@
import org.elasticsearch.xpack.core.rollup.job.RollupJobStats;
import org.elasticsearch.xpack.rollup.Rollup;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.zip.CRC32;

/**
* These utilities are used to convert agg responses into a set of rollup documents.
Expand All @@ -41,12 +38,16 @@ class IndexerUtils {
* The only entry point in this class. You hand this method an aggregation and an index
* pattern, and it returns a list of rolled documents that you can index
*
* @param agg The aggregation response that you want to rollup
* @param rollupIndex The index that holds rollups for this job
* @param agg The aggregation response that you want to rollup
* @param rollupIndex The index that holds rollups for this job
* @param stats The stats accumulator for this job's task
* @param groupConfig The grouping configuration for the job
* @param jobId The ID for the job
* @param isUpgradedDocID `true` if this job is using the new ID scheme
* @return A list of rolled documents derived from the response
*/
static List<IndexRequest> processBuckets(CompositeAggregation agg, String rollupIndex, RollupJobStats stats,
GroupConfig groupConfig, String jobId) {
GroupConfig groupConfig, String jobId, boolean isUpgradedDocID) {

logger.debug("Buckets: [" + agg.getBuckets().size() + "][" + jobId + "]");
return agg.getBuckets().stream().map(b ->{
Expand All @@ -57,24 +58,30 @@ static List<IndexRequest> processBuckets(CompositeAggregation agg, String rollup
TreeMap<String, Object> keys = new TreeMap<>(b.getKey());
List<Aggregation> metrics = b.getAggregations().asList();

RollupIDGenerator idGenerator;
if (isUpgradedDocID) {
idGenerator = new RollupIDGenerator.Murmur3(jobId);
} else {
idGenerator = new RollupIDGenerator.CRC();
}
Map<String, Object> doc = new HashMap<>(keys.size() + metrics.size());
CRC32 docId = processKeys(keys, doc, b.getDocCount(), groupConfig);
byte[] vs = jobId.getBytes(StandardCharsets.UTF_8);
docId.update(vs, 0, vs.length);

processKeys(keys, doc, b.getDocCount(), groupConfig, idGenerator);
idGenerator.add(jobId);
processMetrics(metrics, doc);

doc.put(RollupField.ROLLUP_META + "." + RollupField.VERSION_FIELD, Rollup.ROLLUP_VERSION);
doc.put(RollupField.ROLLUP_META + "." + RollupField.VERSION_FIELD,
isUpgradedDocID ? Rollup.CURRENT_ROLLUP_VERSION : Rollup.ROLLUP_VERSION_V1);
doc.put(RollupField.ROLLUP_META + "." + RollupField.ID.getPreferredName(), jobId);

IndexRequest request = new IndexRequest(rollupIndex, RollupField.TYPE_NAME, String.valueOf(docId.getValue()));
IndexRequest request = new IndexRequest(rollupIndex, RollupField.TYPE_NAME, idGenerator.getID());
request.source(doc);
return request;
}).collect(Collectors.toList());
}

private static CRC32 processKeys(Map<String, Object> keys, Map<String, Object> doc, long count, GroupConfig groupConfig) {
CRC32 docID = new CRC32();

private static void processKeys(Map<String, Object> keys, Map<String, Object> doc,
long count, GroupConfig groupConfig, RollupIDGenerator idGenerator) {
keys.forEach((k, v) -> {
// Also add a doc count for each key. This will duplicate data, but makes search easier later
doc.put(k + "." + RollupField.COUNT_FIELD, count);
Expand All @@ -83,37 +90,34 @@ private static CRC32 processKeys(Map<String, Object> keys, Map<String, Object> d
assert v != null;
doc.put(k + "." + RollupField.TIMESTAMP, v);
doc.put(k + "." + RollupField.INTERVAL, groupConfig.getDateHisto().getInterval());
doc.put(k + "." + DateHistogramGroupConfig.TIME_ZONE, groupConfig.getDateHisto().getTimeZone());
docID.update(Numbers.longToBytes((Long)v), 0, 8);
doc.put(k + "." + DateHistogramGroupConfig.TIME_ZONE, groupConfig.getDateHisto().getTimeZone().toString());
idGenerator.add((Long)v);
} else if (k.endsWith("." + HistogramAggregationBuilder.NAME)) {
doc.put(k + "." + RollupField.VALUE, v);
doc.put(k + "." + RollupField.INTERVAL, groupConfig.getHisto().getInterval());
if (v == null) {
// Arbitrary value to update the doc ID with for nulls
docID.update(19);
idGenerator.addNull();
} else {
docID.update(Numbers.doubleToBytes((Double) v), 0, 8);
idGenerator.add((Double) v);
}
} else if (k.endsWith("." + TermsAggregationBuilder.NAME)) {
doc.put(k + "." + RollupField.VALUE, v);
if (v == null) {
// Arbitrary value to update the doc ID with for nulls
docID.update(19);
idGenerator.addNull();
} else if (v instanceof String) {
byte[] vs = ((String) v).getBytes(StandardCharsets.UTF_8);
docID.update(vs, 0, vs.length);
idGenerator.add((String)v);
} else if (v instanceof Long) {
docID.update(Numbers.longToBytes((Long)v), 0, 8);
idGenerator.add((Long)v);
} else if (v instanceof Double) {
docID.update(Numbers.doubleToBytes((Double)v), 0, 8);
idGenerator.add((Double)v);
} else {
throw new RuntimeException("Encountered value of type [" + v.getClass() + "], which was unable to be processed.");
throw new RuntimeException("Encountered value of type ["
+ v.getClass() + "], which was unable to be processed.");
}
} else {
throw new ElasticsearchException("Could not identify key in agg [" + k + "]");
}
});
return docID;
}

private static void processMetrics(List<Aggregation> metrics, Map<String, Object> doc) {
Expand Down
Loading

0 comments on commit fc9fb64

Please sign in to comment.