Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rollup] Improve ID scheme for rollup documents #32558

Merged
merged 6 commits into from
Aug 3, 2018

Conversation

polyfractal
Copy link
Contributor

Previously, we were using a simple CRC32 for the IDs of rollup documents. This is a very poor choice however, since 32bit IDs leads to collisions between documents very quickly.

This PR moves Rollups over to a 128bit ID. The ID is a concatenation of all the keys in the document (similar to the rolling CRC before), hashed with 128bit Murmur3, then base64 encoded. Finally, the job ID and a delimiter ($) are prepended to the ID.

This guarantees that there are 128bits per-job. 128bits should essentially remove all chances of collisions, and the prepended job ID means that if there is a collision, it stays "within" the job.

BWC notes:

We can only upgrade the ID scheme after we know there has been a good checkpoint during indexing. We don't rely on a STARTED/STOPPED status since we can't guarantee that resulted from a real checkpoint, or some other circumstance. So we only upgrade the ID after we have reached a checkpoint state during an active index run, and only after the checkpoint has been confirmed.

Once a job has been upgraded and checkpointed, the version increments and the new ID is used in the future. All new jobs use the new ID from the start.

The flag for this is stored in the RollupJobStatus and persisted through the persistent task framework. I would have preferred this flag be on the RollupJob itself, but the persistent task framework doesn't allow updates to the Params, only the Status. That's why the "upgraded_doc_id" flag leaks into the JSON everywhere. But that will probably be a useful diagnostic/support tool so I don't think it's too terrible.

Testing:
Adds both a rolling upgrade BWC test, and a full cluster restart test. Might be overkill, but I wanted to make sure it didn't break anything.

Closes #32372

Previously, we were using a simple CRC32 for the IDs of rollup documents.
This is a very poor choice however, since 32bit IDs leads to collisions
between documents very quickly.

This commit moves Rollups over to a 128bit ID.  The ID is a concatenation
of all the keys in the document (similar to the rolling CRC before),
hashed with 128bit Murmur3, then base64 encoded.  Finally, the job
ID and a delimiter (`$`) are prepended to the ID.

This gurantees that there are 128bits per-job.  128bits should
essentially remove all chances of collisions, and the prepended
job ID means that _if_ there is a collision, it stays "within"
the job.

BWC notes:

We can only upgrade the ID scheme after we know there has been a good
checkpoint during indexing.  We don't rely on a STARTED/STOPPED
status since we can't guarantee that resulted from a real checkpoint,
or other state.  So we only upgrade the ID after we have reached
a checkpoint state during an active index run, and only after the
checkpoint has been confirmed.

Once a job has been upgraded and checkpointed, the version increments
and the new ID is used in the future.  All new jobs use the
new ID from the start
@polyfractal polyfractal added review v7.0.0 :StorageEngine/Rollup Turn fine-grained time-based data into coarser-grained data v6.4.0 v6.5.0 labels Aug 1, 2018
@polyfractal polyfractal requested a review from jimczi August 1, 2018 17:17
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-search-aggs

@@ -54,6 +55,7 @@
private final AtomicReference<IndexerState> state;
private final AtomicReference<Map<String, Object>> position;
private final Executor executor;
protected final AtomicBoolean upgradedDocumentID;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: an atomic is probably overkill here... only the indexer thread can update it, so we probably could have used a volatile here. But I didn't want to mess around with concurrency semantics on this bugfix so went with a simple, easily-reasoned atomic.

@@ -240,6 +262,8 @@ public synchronized void start(ActionListener<StartRollupJobAction.Response> lis
listener.onResponse(new StartRollupJobAction.Response(true));
},
(exc) -> {
// We were unable to update the persistent status, so we need to shutdown the indexer too.
indexer.stop();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated to the bugfix, but looked like a bug in it's own right so I fixed here. We shouldn't let the indexer keep running if we weren't able to persist the state... could lead to a strange situation where we are persisted as stopped but indexing a bunch of data.

// 2. we persist STOPPED now, indexer continues a bit but then dies. When/if we resume we'll pick up
// at last checkpoint, overwrite some docs and eventually checkpoint. At that time we'll also
// upgrade the ID scheme
RollupJobStatus state = new RollupJobStatus(IndexerState.STOPPED, indexer.getPosition(), upgradedDocumentID.get());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a clarification because the old comment was equal parts vague, confusing and a bit wrong.

Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @polyfractal ! It looks good overall.
I left some minor comments regarding the hash that we generate but otherwise LGTM.

byte[] hashedBytes = new byte[16];
System.arraycopy(Numbers.longToBytes(hasher.h1), 0, hashedBytes, 0, 8);
System.arraycopy(Numbers.longToBytes(hasher.h2), 0, hashedBytes, 8, 8);
return jobId + "$" + Base64.getUrlEncoder().withoutPadding().encodeToString(hashedBytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we use 128 bits I think it's ok to just update the hash with the jobId instead of adding it as a prefix ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The prefix compression should handle the jobId efficiently so let s keep it this way. This will prevent collision accross jobs so please forget my last comment ;).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that works for me. Just need to tweak the tests a bit, since I was using the job ID as a shortcut for "old vs new" ID... but we can use length instead :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For posterity we chatted in slack and decided to leave the job name prepended. Prefix encoding should make the overhead minimal, and it is convenient to have the job ID attached (as well as preventing some other forms of collisions due to same job but different interval, etc)

initialState = IndexerState.STOPPED;
} else {
initialState = existingState;
}
initialPosition = state.getPosition();

// Since we have state, there could a incomplete checkpoint so
// use the state's ID scheme
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you clarify ? ;)

*/
public static class Murmur3 extends RollupIDGenerator {
private static final long SEED = 19;
private static final BytesRef DELIM = new BytesRef("$".getBytes(StandardCharsets.UTF_8));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new BytesRef("$") should be enough

public static class Murmur3 extends RollupIDGenerator {
private static final long SEED = 19;
private static final BytesRef DELIM = new BytesRef("$".getBytes(StandardCharsets.UTF_8));
private static final byte[] NULL_PLACEHOLDER = "__NULL_PLACEHOLDER__830f1de2__".getBytes(StandardCharsets.UTF_8);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use a BytesRef ?

public abstract void addNull();
public abstract String getID();

private boolean generated = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't proved this to myself by writing a multithreaded driver, but, looking at the code, I am pretty certain that this class can't be used in a multithreaded env, because check and setFlag are not coordinated. if I am not wrong, we should probably either document this, or, potentially re-implement this class as a builder, where mutated methods return a modified instance? 🤷‍♂️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a javadoc explaining the non-threadsafe nature.

This is used by the indexer which is single-threaded in Rollup (only a single thread-per-job is running the indexer). We could make it an AtomicBoolean and make it thread safe, but given the current Rollup design it's overkill. I don't see this being used outside of Rollup right now, so ++ to just documenting the limitation.

}

@SuppressWarnings("unchecked")
private Map<String, Object> getJob(Map<String, Object> jobsMap, String targetJobId) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be static? not sure our convention here, but I've always biased towards making methods that don't depend on instance state static. same for getJob. kind of a nit, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can do. :)

client().performRequest(indexRequest);

// create the rollup job
final Request createRollupJobRequest = new Request("PUT", "/_xpack/rollup/job/rollup-id-test");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it worth creating a constant for _xpack/rollup/job/ part of path?


if (CLUSTER_TYPE == ClusterType.MIXED && Booleans.parseBoolean(System.getProperty("tests.first_round"))) {
final Request indexRequest = new Request("POST", "/target/_doc/2");
indexRequest.setJsonEntity("{\"timestamp\":\"2018-01-02T00:00:01\",\"value\":345}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kind of a nit, but when I've done this kind of thing, I always regret it when I use hardcoded timestamps. Can we refactor to use a base timestamp, and then use date math to add a day and such? that would make it easier to randomize base timestamp, and more clear what the test is doing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can... but it will complicate the test and may make it flakier.

The IDs that are generated in rollup docs are essentially a concatenation of the composite agg keys, which are then hashed (either CRC32 in old, or Murmur3 in new). The composite keys will be values like the "2018-01-02T00:00:01" and 345.

So by hardcoding the timestamps we know what IDs are going to be generated at each phase of the rolling upgrade and can test for them explicitly.

If we randomize, we'll also have to run the values through the ID generator to create the hashes. I always get worried when I have to use part of the thing under test to verify the test itself. E.g. if we break the ID generator in the future, the "broken" IDs would then change in the test and we might not notice.

We do have some unit tests that check the ID itself (IndexerUtilsTests#testKeyOrderingOldID(), IndexerUtilsTests#testKeyOrderingNewID(), IndexerUtilsTests#testKeyOrderingNewIDLong()) so maybe it's not an issue... but it does cause me a bit of concern.

WDYT?

In either case, I can document what this test is doing so it's more clear that the values being indexed are directly affecting the IDs which are being verified.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that seems fine. fwiw, I am not saying we should necessarily randomize, but that for readability and maintainability, it's clearer to have (forgive the pseudocode)

DateTime baseDate = new DateTime("2010-01-01")
indexRequest.setJsonEntity("timestamp"+baseDate.plusDays(1).toString());
....
indexRequest.setJsonEntity("timestamp"+baseDate.plusDays(2).toString());
...

again this is kind of a nit but I've found as test scope grows, that hardcoded dates that depend on each other throughout the tests are hard to manage, and mutating a base date makes refactoring things easier and clearer to our future selves what the test is doing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Roger, parameterizing but not-randomizing seems fine with me. Will make the change :)


if (CLUSTER_TYPE == ClusterType.MIXED && Booleans.parseBoolean(System.getProperty("tests.first_round")) == false) {
final Request indexRequest = new Request("POST", "/target/_doc/3");
indexRequest.setJsonEntity("{\"timestamp\":\"2018-01-03T00:00:01\",\"value\":456}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above comment on timestamps


if (CLUSTER_TYPE == ClusterType.UPGRADED) {
final Request indexRequest = new Request("POST", "/target/_doc/4");
indexRequest.setJsonEntity("{\"timestamp\":\"2018-01-04T00:00:01\",\"value\":567}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above comment on timestamps :)

Copy link
Contributor

@pcsanwald pcsanwald left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work!

@polyfractal
Copy link
Contributor Author

Thanks @jimczi @pcsanwald! :)

@polyfractal polyfractal merged commit fc9fb64 into elastic:master Aug 3, 2018
polyfractal added a commit that referenced this pull request Aug 3, 2018
Previously, we were using a simple CRC32 for the IDs of rollup documents.
This is a very poor choice however, since 32bit IDs leads to collisions
between documents very quickly.

This commit moves Rollups over to a 128bit ID.  The ID is a concatenation
of all the keys in the document (similar to the rolling CRC before),
hashed with 128bit Murmur3, then base64 encoded.  Finally, the job
ID and a delimiter (`$`) are prepended to the ID.

This gurantees that there are 128bits per-job.  128bits should
essentially remove all chances of collisions, and the prepended
job ID means that _if_ there is a collision, it stays "within"
the job.

BWC notes:

We can only upgrade the ID scheme after we know there has been a good
checkpoint during indexing.  We don't rely on a STARTED/STOPPED
status since we can't guarantee that resulted from a real checkpoint,
or other state.  So we only upgrade the ID after we have reached
a checkpoint state during an active index run, and only after the
checkpoint has been confirmed.

Once a job has been upgraded and checkpointed, the version increments
and the new ID is used in the future.  All new jobs use the
new ID from the start
polyfractal added a commit that referenced this pull request Aug 3, 2018
Previously, we were using a simple CRC32 for the IDs of rollup documents.
This is a very poor choice however, since 32bit IDs leads to collisions
between documents very quickly.

This commit moves Rollups over to a 128bit ID.  The ID is a concatenation
of all the keys in the document (similar to the rolling CRC before),
hashed with 128bit Murmur3, then base64 encoded.  Finally, the job
ID and a delimiter (`$`) are prepended to the ID.

This gurantees that there are 128bits per-job.  128bits should
essentially remove all chances of collisions, and the prepended
job ID means that _if_ there is a collision, it stays "within"
the job.

BWC notes:

We can only upgrade the ID scheme after we know there has been a good
checkpoint during indexing.  We don't rely on a STARTED/STOPPED
status since we can't guarantee that resulted from a real checkpoint,
or other state.  So we only upgrade the ID after we have reached
a checkpoint state during an active index run, and only after the
checkpoint has been confirmed.

Once a job has been upgraded and checkpointed, the version increments
and the new ID is used in the future.  All new jobs use the
new ID from the start
jasontedor added a commit to jasontedor/elasticsearch that referenced this pull request Aug 6, 2018
…pe-detection-with-leading-whitespace

* elastic/master: (34 commits)
  Cross-cluster search: preserve cluster alias in shard failures (elastic#32608)
  Handle AlreadyClosedException when bumping primary term
  [TEST] Allow to run in FIPS JVM (elastic#32607)
  [Test] Add ckb to the list of unsupported languages (elastic#32611)
  SCRIPTING: Move Aggregation Scripts to their own context (elastic#32068)
  Painless: Use LocalMethod Map For Lookup at Runtime (elastic#32599)
  [TEST] Enhance failure message when bulk updates have failures
  [ML] Add ML result classes to protocol library (elastic#32587)
  Suppress LicensingDocumentationIT.testPutLicense in release builds (elastic#32613)
  [Rollup] Update wire version check after backport
  Suppress Wildfly test in FIPS JVMs (elastic#32543)
  [Rollup] Improve ID scheme for rollup documents (elastic#32558)
  ingest: doc: move Dot Expander Processor doc to correct position (elastic#31743)
  [ML] Add some ML config classes to protocol library (elastic#32502)
  [TEST]Split transport verification mode none tests (elastic#32488)
  Core: Move helper date formatters over to java time (elastic#32504)
  [Rollup] Remove builders from DateHistogramGroupConfig (elastic#32555)
  [TEST} unmutes SearchAsyncActionTests and adds debugging info
  [ML] Add Detector config classes to protocol library (elastic#32495)
  [Rollup] Remove builders from MetricConfig (elastic#32536)
  ...
dnhatn added a commit that referenced this pull request Aug 6, 2018
* 6.x:
  [Kerberos] Use canonical host name (#32588)
  Cross-cluster search: preserve cluster alias in shard failures (#32608)
  [TEST] Allow to run in FIPS JVM (#32607)
  Handle AlreadyClosedException when bumping primary term
  [Test] Add ckb to the list of unsupported languages (#32611)
  SCRIPTING: Move Aggregation Scripts to their own context (#32068) (#32629)
  [TEST] Enhance failure message when bulk updates have failures
  [ML] Add ML result classes to protocol library (#32587)
  Suppress LicensingDocumentationIT.testPutLicense in release builds (#32613)
  [Rollup] Improve ID scheme for rollup documents (#32558)
  Mutes failing SQL string function tests due to #32589
  Suppress Wildfly test in FIPS JVMs (#32543)
  Add cluster UUID to Cluster Stats API response (#32206)
  [ML] Add some ML config classes to protocol library (#32502)
  [TEST]Split transport verification mode none tests (#32488)
  [Rollup] Remove builders from DateHistogramGroupConfig (#32555)
  [ML] Add Detector config classes to protocol library (#32495)
  [Rollup] Remove builders from MetricConfig (#32536)
  Fix race between replica reset and primary promotion (#32442)
  HLRC: Move commercial clients from XPackClient (#32596)
  Security: move User to protocol project (#32367)
  Minor fix for javadoc (applicable for java 11). (#32573)
  Painless: Move Some Lookup Logic to PainlessLookup (#32565)
  Core: Minor size reduction for AbstractComponent (#32509)
  INGEST: Enable default pipelines (#32286) (#32591)
  TEST: Avoid merges in testSeqNoAndCheckpoints
  [Rollup] Remove builders from HistoGroupConfig (#32533)
  fixed elements in array of produced terms (#32519)
  Mutes ReindexFailureTests.searchFailure dues to #28053
  Mutes LicensingDocumentationIT due to #32580
  Remove the SATA controller from OpenSUSE box
  [ML] Rename JobProvider to JobResultsProvider (#32551)
dnhatn added a commit that referenced this pull request Aug 6, 2018
* master:
  Cross-cluster search: preserve cluster alias in shard failures (#32608)
  Handle AlreadyClosedException when bumping primary term
  [TEST] Allow to run in FIPS JVM (#32607)
  [Test] Add ckb to the list of unsupported languages (#32611)
  SCRIPTING: Move Aggregation Scripts to their own context (#32068)
  Painless: Use LocalMethod Map For Lookup at Runtime (#32599)
  [TEST] Enhance failure message when bulk updates have failures
  [ML] Add ML result classes to protocol library (#32587)
  Suppress LicensingDocumentationIT.testPutLicense in release builds (#32613)
  [Rollup] Update wire version check after backport
  Suppress Wildfly test in FIPS JVMs (#32543)
  [Rollup] Improve ID scheme for rollup documents (#32558)
  ingest: doc: move Dot Expander Processor doc to correct position (#31743)
  [ML] Add some ML config classes to protocol library (#32502)
  [TEST]Split transport verification mode none tests (#32488)
  Core: Move helper date formatters over to java time (#32504)
  [Rollup] Remove builders from DateHistogramGroupConfig (#32555)
  [TEST} unmutes SearchAsyncActionTests and adds debugging info
  [ML] Add Detector config classes to protocol library (#32495)
  [Rollup] Remove builders from MetricConfig (#32536)
  Tests: Add rolling upgrade tests for watcher (#32428)
  Fix race between replica reset and primary promotion (#32442)
@polyfractal polyfractal added >bug and removed review labels Aug 20, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>bug :StorageEngine/Rollup Turn fine-grained time-based data into coarser-grained data v6.4.0 v6.5.0 v7.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants