Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rollup] Re-factor Rollup Indexer into a generic indexer for re-usability #32743

Merged
merged 24 commits into from
Aug 29, 2018

Conversation

hendrikmuhs
Copy link
Contributor

@hendrikmuhs hendrikmuhs commented Aug 9, 2018

Motivation

I am working on a feature that is similar to rollup as it will work with the same 2 cycle process of 1st querying a source index and 2nd indexing results to a destination index.

@polyfractal and I discussed this and realised it would be useful if such features could use a common approach to performing these kinds of operations. This PR serves to implement that common approach.

Solution

This PR extracts a super class out of the rollup indexer called the iterative indexer. The implementor of it can define the query, transformation of the response, indexing and the object to persist the position/state of the indexer.

To be discussed

Apart from the approach (I will annotate some open questions in code), naming; The change introduces a package org.elasticsearch.xpack.core.indexing and a class IterativeIndexer - I could not find a better name yet, I am more than happy about suggestions.

Notes

This PR is not fully polished yet as I want to discuss the approach 1st before finalizing the details.

@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-search-aggs

public static final ConstructingObjectParser<RollupJobStats, Void> PARSER =
new ConstructingObjectParser<>(NAME.getPreferredName(),
args -> new RollupJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3]));
public class RollupJobStats extends IndexerStats {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is empty as there is nothing in there specific to rollup right now, but that might change.

Note: I changed field names! So this break BWC for the get rollup jobs api. Seeking advise, whether this is a problem and whether I should implement a compat layer here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use the deprecated names feature of ParseField so we avoid a bwc break here. Its true that rollups is experimental so we can in theory make bwc breaks without notice but I think where its easy for us to deprecate we should 😄

*
* @param <JobPosition> Type that defines a job position to be defined by the implementation.
*/
public abstract class IterativeIndexer<JobPosition> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: The flexibility of JobPosition is a requirement for my usage of this class.

@polyfractal
Copy link
Contributor

/cc @jimczi too

@colings86 colings86 added :StorageEngine/Rollup Turn fine-grained time-based data into coarser-grained data and removed :Analytics/Aggregations Aggregations labels Aug 9, 2018
hendrikmuhs pushed a commit to hendrikmuhs/elasticsearch that referenced this pull request Aug 10, 2018
to be reverted and replaced by final version of this PR
hendrikmuhs pushed a commit that referenced this pull request Aug 10, 2018
This is a temporary (squashed) commit of #32743 till ed4feb6,
to be reverted and replaced by the final version of this PR
builder.field(NUM_DOCUMENTS.getPreferredName(), numDocuments);
builder.field(NUM_ROLLUPS.getPreferredName(), numRollups);
builder.field(NUM_INPUT_DOCUMENTS.getPreferredName(), numInputDocuments);
builder.field(NUM_OUTPUT_DOCUMENTS.getPreferredName(), numOuputDocuments);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Piggybacking on @colings86's comment, I think we might need a conditional here for 6.5 when running under Rollup context? Haven't completely convinced myself this is needed, but I think it is for display purposes.

E.g. pass in some kind of flag in Params (from GetRollupJobsAction#JobWrapper.toXContent()) so that we can use the deprecated name for rollups. There might be a different/better way to handle this, not sure... never done this sort of change before.

Luckily this isn't persisted anywhere, so we don't have to worry about that sort of bwc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@polyfractal
Copy link
Contributor

Gave it a read through, I like how things are falling out. I was never keen on how big RollupIndexer was, so separating some of this logic out is nice.

Going to let it stew in my brain over the weekend and give it another, deeper look on Monday.

Copy link
Contributor

@colings86 colings86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few minor comments. I'm not sure at the moment if its safe for the IterativeIndexer to hold and update the state and position independent of the persistent task but I need to think on this a bit more

this.numPages = numPages;
this.numDocuments = numDocuments;
this.numRollups = numRollups;
this.numInputDocuments = numDocuments;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we call both sides of this numInputDocuments?

* Result object to hold the result of 1 iteration of iterative indexing.
* Acts as an interface between the implementation and the generic indexer.
*/
public class Iteration<JobPosition> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: I wonder if we should call this IterationResult as Iteration made me think it was the thing doing the work rather than just holding the result of an iteration?

*/
public boolean isDone() {
return isDone;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this not belong in JobPosition?

@hendrikmuhs
Copy link
Contributor Author

addressed the review comments:

  • the output of rollup stats is now reverted back to the original version.
  • various renamings

@@ -84,6 +84,11 @@ public long getOutputDocuments() {
return numOuputDocuments;
}

@Deprecated
public long getNumRollups() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely positive if this is required... but I think it is. We expose the IndexerJobStats object in the GetRollupJob API, so I think we need to keep this method around because anyone using the Java client could potentially be using it.

I think :)

private static ParseField NUM_ROLLUPS = new ParseField("rollups_indexed");
private static ParseField NUM_INPUT_DOCUMENTS = new ParseField("documents_processed");
// BWC for RollupJobStats
public static String ROLLUP_BWC_XCONTENT_PARAM = "rollup_6_4_compat";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not in love with this flag name... but I can't think of something better. It will only be exposed on the GetRollupJob API, so I don't think we need "rollup" in the name.

Maybe "old_stats_format" or something? Open to suggestions

@polyfractal
Copy link
Contributor

@colings86 made a few minor tweaks, left some questions. I think this is probably mostly good to go.

I'm not sure at the moment if its safe for the IterativeIndexer to hold and update the state and position independent of the persistent task but I need to think on this a bit more

I think this should be ok, since the RollupIndexer extends IterativeIndexer, and the (allocated) persistent task owns the indexer. If the allocated task is killed the indexer will go with it.

There's probably some undefined behavior here if two instances of an indexer are running at the same time and holding onto the same state/position references. It's probably ok because they were designed for external threads to signal via those atomics, but yunno, not tested :)

Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. I left some minor comments. Regarding the stats naming deprecation I wonder if we really need to flood the log for such a minor change.
The stats api is informal so we should be allowed to change things here as long as we don't break inter-nodes communication, @colings86 @polyfractal ?

*
* @param now The current time in milliseconds passed through from {@link #maybeTriggerAsyncJob(long)}
*/
protected abstract void onStart(long now);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe onStartJob to disambiguate ?

*
* @param <JobPosition> Type that defines a job position to be defined by the implementation.
*/
public abstract class IterativeIndexer<JobPosition> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The design is for an asynchronous execution so what about AsyncBulkBySearchIndexer ?

Copy link
Contributor

@polyfractal polyfractal Aug 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ getting async in the name. Some other ideas too:

  • AsyncTwoPhaseIndexer (phases being search + bulk)
  • AsyncTwoCycleIndexer
  • AsyncSearchDrivenIndexer
  • AsyncSearchTransformingIndexer

I sorta like the two phase one, but otherwise don't really have a preference. AsyncBulkBySearchIndexer is good with me too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with AsyncTwoPhaseIndexer. If anyone feel strongly about the name let me know before it gets merged :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@colings86
Copy link
Contributor

I think its important that we use a consistent approach to all deprecations so I'm keen for us to log to the deprecation logger if the client is not adding the flag to use the new output format so any users programmatically using the stats API are warned of the upcoming breaking change and given the opportunity to migrate to the new response format early.

With IndexerJobStats now abstract, each implementation of of the
AsyncTwoPhaseIndexer can implement their own job stats.  The base
abstract class provides some stats (docs indexed, etc), but allows
each specific implementation the ability to add more stats and control
how they are displayed in XContent.

This removes the need for the BWC flag.
@polyfractal
Copy link
Contributor

@jimczi refactored IndexerJobStats to be abstract as we discussed. There's no longer a bwc break for Rollup, as each implementation of AsyncTwoPhaseIndexer implements their own stats object (along with xcontent handling).

Would you mind taking a look at those changs?

Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @polyfractal , glad we can avoid the deprecation. I left some comments regarding the new classes.

}

@Deprecated
public long getNumRollups() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this entirely, we changed the package and the name so it's already breaking ?

*
* @param <JobPosition> Type that defines a job position to be defined by the implementation.
*/
public abstract class AsyncTwoPhaseIndexer<JobPosition> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add IndexerJobStats in the declaration to enforce type (AsyncTwoPhaseIndexer<JobPosition, JobStats extends IndexerJobStats>) ? The client should receive a RollupIndexerJobStats when requesting stats for a rollup job.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

/**
* Get the stats of this indexer.
*/
public IndexerJobStats getStats() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's return the concrete class

Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left more comments.

private final RollupJobStatus status;

public static final ConstructingObjectParser<JobWrapper, Void> PARSER
= new ConstructingObjectParser<>(NAME, a -> new JobWrapper((RollupJobConfig) a[0],
(RollupJobStats) a[1], (RollupJobStatus)a[2]));
(IndexerJobStats) a[1], (RollupJobStatus)a[2]));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the concrete class now ?

PARSER.declareObject(ConstructingObjectParser.constructorArg(), RollupJobStatus.PARSER::apply, STATUS);
}

public JobWrapper(RollupJobConfig job, RollupJobStats stats, RollupJobStatus status) {
public JobWrapper(RollupJobConfig job, IndexerJobStats stats, RollupJobStatus status) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, RollupIndexerJobStats

@@ -46,7 +46,7 @@
* @param isUpgradedDocID `true` if this job is using the new ID scheme
* @return A list of rolled documents derived from the response
*/
static List<IndexRequest> processBuckets(CompositeAggregation agg, String rollupIndex, RollupJobStats stats,
static List<IndexRequest> processBuckets(CompositeAggregation agg, String rollupIndex, IndexerJobStats stats,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the concrete classRollupIndexJobStats.

@polyfractal
Copy link
Contributor

Ah, thanks @jimczi. Found a few more uses of the abstract class and swapped them to concrete too.

Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @polyfractal

@polyfractal
Copy link
Contributor

Jenkins, run sample packaging tests

@polyfractal polyfractal merged commit cfc003d into elastic:master Aug 29, 2018
polyfractal pushed a commit that referenced this pull request Aug 29, 2018
…lity (#32743)

This extracts a super class out of the rollup indexer called the AsyncTwoPhaseIterator. 
The implementor of it can define the query, transformation of the response, 
indexing and the object to persist the position/state of the indexer.

The stats object used by the indexer to record progress is also now abstract, allowing
the implementation provide custom stats beyond what the indexer provides.  It also
allows the implementation to decide how the stats are presented (leaves toXContent()
up to the implementation).

This should allow new projects to reuse the search-then-index persistent task that Rollup
uses, but without the restrictions/baggage of how Rollup has to work internally to
satisfy time-based rollups.
dnhatn added a commit that referenced this pull request Sep 1, 2018
* 6.x:
  Mute test watcher usage stats output
  [Rollup] Fix FullClusterRestart test
  TEST: Disable soft-deletes in ParentChildTestCase
  TEST: Disable randomized soft-deletes settings
  Integrates soft-deletes into Elasticsearch (#33222)
  drop `index.shard.check_on_startup: fix` (#32279)
  Fix AwaitsFix issue number
  Mute SmokeTestWatcherWithSecurityIT testsi
  [DOCS] Moves ml folder from x-pack/docs to docs (#33248)
  TEST: mute more SmokeTestWatcherWithSecurityIT tests
  [DOCS] Move rollup APIs to docs (#31450)
  [DOCS] Rename X-Pack Commands section (#33005)
  Fixes SecurityIntegTestCase so it always adds at least one alias (#33296)
  TESTS: Fix Random Fail in MockTcpTransportTests (#33061) (#33307)
  MINOR: Remove Dead Code from PathTrie (#33280) (#33306)
  Fix pom for build-tools (#33300)
  Lazy evaluate java9home (#33301)
  SQL: test coverage for JdbcResultSet (#32813)
  Work around to be able to generate eclipse projects (#33295)
  Different handling for security specific errors in the CLI. Fix for #33230 (#33255)
  [ML] Refactor delimited file structure detection (#33233)
  SQL: Support multi-index format as table identifier (#33278)
  Enable forbiddenapis server java9 (#33245)
  [MUTE] SmokeTestWatcherWithSecurityIT flaky tests
  Add region ISO code to GeoIP Ingest plugin (#31669) (#33276)
  Don't be strict for 6.x
  Update serialization versions for custom IndexMetaData backport
  Replace IndexMetaData.Custom with Map-based custom metadata (#32749)
  Painless: Fix Bindings Bug (#33274)
  SQL: prevent duplicate generation for repeated aggs (#33252)
  TEST: Mute testMonitorClusterHealth
  Fix serialization of empty field capabilities response (#33263)
  Fix nested _source retrieval with includes/excludes (#33180)
  [DOCS] TLS file resources are reloadable (#33258)
  Watcher: Ensure TriggerEngine start replaces existing watches (#33157)
  Ignore module-info in jar hell checks (#33011)
  Fix docs build after #33241
  [DOC] Repository GCS ADC not supported (#33238)
  Upgrade to latest Gradle 4.10  (#32801)
  Fix/30904 cluster formation part2 (#32877)
  Move file-based discovery to core (#33241)
  HLRC: add client side RefreshPolicy (#33209)
  [Kerberos] Add unsupported languages for tests (#33253)
  Watcher: Reload properly on remote shard change (#33167)
  Fix classpath security checks for external tests. (#33066)
  [Rollup] Only allow aggregating on multiples of configured interval (#32052)
  Added deprecation warning for rescore in scroll queries (#33070)
  Apply settings filter to get cluster settings API (#33247)
  [Rollup] Re-factor Rollup Indexer into a generic indexer for re-usability   (#32743)
  HLRC: create base timed request class (#33216)
  HLRC: Use Optional in validation logic (#33104)
  Painless: Add Bindings (#33042)
hendrikmuhs pushed a commit that referenced this pull request Sep 10, 2018
Replace mocked indexer and use AsyncTwoPhaseIndexer (introduced in #32743) instead.
@hendrikmuhs hendrikmuhs deleted the rollup-indexer-refactor branch October 18, 2018 08:29
@jimczi jimczi added v7.0.0-beta1 and removed v7.0.0 labels Feb 7, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>refactoring :StorageEngine/Rollup Turn fine-grained time-based data into coarser-grained data v6.5.0 v7.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants