From 4946d33ba3e2038f79383baf80f530740e767b59 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 10 Sep 2018 14:44:53 +0200 Subject: [PATCH] [ML-Dataframe] Use AsyncTwoPhaseIndexer (#33504) Replace mocked indexer and use AsyncTwoPhaseIndexer (introduced in #32743) instead. --- .../FeatureIndexBuilder.java | 4 +- ...nsportPutFeatureIndexBuilderJobAction.java | 41 +++- .../job/FeatureIndexBuilderIndexer.java | 179 ++++++------------ .../job/FeatureIndexBuilderJob.java | 6 + ...ndexBuilderJobPersistentTasksExecutor.java | 63 ++++++ .../job/FeatureIndexBuilderJobStats.java | 67 +++++++ .../job/FeatureIndexBuilderJobTask.java | 126 +++++++----- 7 files changed, 312 insertions(+), 174 deletions(-) create mode 100644 x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobPersistentTasksExecutor.java create mode 100644 x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobStats.java diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/FeatureIndexBuilder.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/FeatureIndexBuilder.java index bd9bcd751be6a..01537d493c3e4 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/FeatureIndexBuilder.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/FeatureIndexBuilder.java @@ -36,7 +36,7 @@ import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportPutFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportStartFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJob; -import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobTask; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobPersistentTasksExecutor; import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestPutFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestStartFeatureIndexBuilderJobAction; @@ -127,7 +127,7 @@ public List> getPersistentTasksExecutor(ClusterServic } SchedulerEngine schedulerEngine = new SchedulerEngine(settings, Clock.systemUTC()); - return Collections.singletonList(new FeatureIndexBuilderJobTask.FeatureIndexBuilderJobPersistentTasksExecutor(settings, client, + return Collections.singletonList(new FeatureIndexBuilderJobPersistentTasksExecutor(settings, client, schedulerEngine, threadPool)); } @Override diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportPutFeatureIndexBuilderJobAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportPutFeatureIndexBuilderJobAction.java index 16c541db7a01d..37252bf82a410 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportPutFeatureIndexBuilderJobAction.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportPutFeatureIndexBuilderJobAction.java @@ -7,9 +7,11 @@ package org.elasticsearch.xpack.ml.featureindexbuilder.action; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -17,6 +19,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.PersistentTasksService; @@ -29,8 +32,14 @@ import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJob; import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobConfig; +import static org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings.DOC_TYPE; + public class TransportPutFeatureIndexBuilderJobAction extends TransportMasterNodeAction { + + // TODO: hack, to be replaced + private static final String PIVOT_INDEX = "pivot-reviews"; + private final XPackLicenseState licenseState; private final PersistentTasksService persistentTasksService; private final Client client; @@ -67,7 +76,7 @@ protected void masterOperation(Request request, ClusterState clusterState, Actio XPackPlugin.checkReadyForXPackCustomMetadata(clusterState); FeatureIndexBuilderJob job = createFeatureIndexBuilderJob(request.getConfig(), threadPool); - + createIndex(client, job.getConfig().getId()); startPersistentTask(job, listener, persistentTasksService); } @@ -90,4 +99,34 @@ static void startPersistentTask(FeatureIndexBuilderJob job, ActionListener + .put("index.number_of_shards", 1).put("index.number_of_replicas", 0)); + request.mapping(DOC_TYPE, // <1> + "{\n" + + " \"" + DOC_TYPE + "\": {\n" + + " \"properties\": {\n" + + " \"reviewerId\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"avg_rating\": {\n" + + " \"type\": \"integer\"\n" + + " }\n" + + " }\n" + + " }\n" + + "}", // <2> + XContentType.JSON); + IndicesAdminClient adminClient = client.admin().indices(); + adminClient.create(request).actionGet(); + } } diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java index e5f33c03fb8b2..8ff748ad265d0 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java @@ -7,166 +7,117 @@ package org.elasticsearch.xpack.ml.featureindexbuilder.job; import org.apache.log4j.Logger; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.IndicesAdminClient; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; -import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation.Bucket; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; +import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.indexing.IterationResult; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings.DOC_TYPE; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -public class FeatureIndexBuilderIndexer { +public abstract class FeatureIndexBuilderIndexer extends AsyncTwoPhaseIndexer, FeatureIndexBuilderJobStats> { private static final String PIVOT_INDEX = "pivot-reviews"; private static final String SOURCE_INDEX = "anonreviews"; private static final Logger logger = Logger.getLogger(FeatureIndexBuilderIndexer.class.getName()); private FeatureIndexBuilderJob job; - private Client client; - public FeatureIndexBuilderIndexer(FeatureIndexBuilderJob job, Client client) { + public FeatureIndexBuilderIndexer(Executor executor, FeatureIndexBuilderJob job, AtomicReference initialState, + Map initialPosition) { + super(executor, initialState, initialPosition, new FeatureIndexBuilderJobStats()); this.job = job; - this.client = client; - logger.info("delete pivot-reviews"); - } - public synchronized void start() { - deleteIndex(client); - - createIndex(client); - - int runs = 0; - - Map after = null; - logger.info("start feature indexing"); - SearchResponse response; - - try { - response = runQuery(client, after); - - CompositeAggregation compositeAggregation = response.getAggregations().get("feature"); - after = compositeAggregation.afterKey(); - - while (after != null) { - indexBuckets(compositeAggregation); - - ++runs; - response = runQuery(client, after); - - compositeAggregation = response.getAggregations().get("feature"); - after = compositeAggregation.afterKey(); - - //after = null; - } - - indexBuckets(compositeAggregation); - } catch (InterruptedException | ExecutionException e) { - logger.error("Failed to build feature index", e); - } - - logger.info("Finished feature indexing"); + @Override + protected String getJobId() { + return job.getConfig().getId(); } - private void indexBuckets(CompositeAggregation compositeAggregation) { - BulkRequest bulkIndexRequest = new BulkRequest(); - try { - for (Bucket b : compositeAggregation.getBuckets()) { + @Override + protected void onStartJob(long now) { + } - InternalAvg avgAgg = b.getAggregations().get("avg_rating"); + @Override + protected IterationResult> doProcess(SearchResponse searchResponse) { + final CompositeAggregation agg = searchResponse.getAggregations().get("feature"); + return new IterationResult<>(processBuckets(agg), agg.afterKey(), agg.getBuckets().isEmpty()); + } - XContentBuilder builder; + /* + * Mocked demo case + * + * TODO: replace with proper implementation + */ + private List processBuckets(CompositeAggregation agg) { + return agg.getBuckets().stream().map(b -> { + InternalAvg avgAgg = b.getAggregations().get("avg_rating"); + XContentBuilder builder; + try { builder = jsonBuilder(); + builder.startObject(); builder.field("reviewerId", b.getKey().get("reviewerId")); builder.field("avg_rating", avgAgg.getValue()); builder.endObject(); - bulkIndexRequest.add(new IndexRequest(PIVOT_INDEX, DOC_TYPE).source(builder)); - + } catch (IOException e) { + throw new UncheckedIOException(e); } - client.bulk(bulkIndexRequest); - } catch (IOException e) { - logger.error("Failed to index", e); - } + + String indexName = PIVOT_INDEX + "_" + job.getConfig().getId(); + IndexRequest request = new IndexRequest(indexName, DOC_TYPE).source(builder); + return request; + }).collect(Collectors.toList()); + } + + @Override + protected SearchRequest buildSearchRequest() { + + final Map position = getPosition(); + SearchRequest request = buildFeatureQuery(position); + return request; } - + /* - * Hardcoded demo case for pivoting + * Mocked demo case + * + * TODO: everything below will be replaced with proper implementation read from job configuration */ - - private static void deleteIndex(Client client) { - DeleteIndexRequest deleteIndex = new DeleteIndexRequest(PIVOT_INDEX); - - IndicesAdminClient adminClient = client.admin().indices(); - try { - adminClient.delete(deleteIndex).actionGet(); - } catch (IndexNotFoundException e) { - } - } - - private static void createIndex(Client client) { - - CreateIndexRequest request = new CreateIndexRequest(PIVOT_INDEX); - request.settings(Settings.builder() // <1> - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - ); - request.mapping(DOC_TYPE, // <1> - "{\n" + - " \"" + DOC_TYPE + "\": {\n" + - " \"properties\": {\n" + - " \"reviewerId\": {\n" + - " \"type\": \"keyword\"\n" + - " },\n" + - " \"avg_rating\": {\n" + - " \"type\": \"integer\"\n" + - " }\n" + - " }\n" + - " }\n" + - "}", // <2> - XContentType.JSON); - IndicesAdminClient adminClient = client.admin().indices(); - adminClient.create(request).actionGet(); - } - private static SearchRequest buildFeatureQuery(Map after) { QueryBuilder queryBuilder = new MatchAllQueryBuilder(); SearchRequest searchRequest = new SearchRequest(SOURCE_INDEX); - + List> sources = new ArrayList<>(); sources.add(new TermsValuesSourceBuilder("reviewerId").field("reviewerId")); - + CompositeAggregationBuilder compositeAggregation = new CompositeAggregationBuilder("feature", sources); compositeAggregation.size(1000); - + if (after != null) { compositeAggregation.aggregateAfter(after); } - + compositeAggregation.subAggregation(AggregationBuilders.avg("avg_rating").field("rating")); compositeAggregation.subAggregation(AggregationBuilders.cardinality("dc_vendors").field("vendorId")); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); @@ -174,21 +125,7 @@ private static SearchRequest buildFeatureQuery(Map after) { sourceBuilder.size(0); sourceBuilder.query(queryBuilder); searchRequest.source(sourceBuilder); - + return searchRequest; - } - - private static SearchResponse runQuery(Client client, Map after) throws InterruptedException, ExecutionException { - - SearchRequest request = buildFeatureQuery(after); - SearchResponse response = client.search(request).get(); - - return response; - } - - private static void indexResult() { - - - } } diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJob.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJob.java index 16a4163e8135a..a1edfca2684a4 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJob.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJob.java @@ -16,6 +16,8 @@ import org.elasticsearch.xpack.core.XPackPlugin; import java.io.IOException; +import java.util.Collections; +import java.util.Map; import java.util.Objects; public class FeatureIndexBuilderJob implements XPackPlugin.XPackPersistentTaskParams { @@ -92,4 +94,8 @@ public boolean equals(Object other) { public int hashCode() { return Objects.hash(config); } + + public Map getHeaders() { + return Collections.emptyMap(); + } } diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobPersistentTasksExecutor.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobPersistentTasksExecutor.java new file mode 100644 index 0000000000000..fefb383f94b05 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobPersistentTasksExecutor.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.job; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTaskState; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.ml.featureindexbuilder.FeatureIndexBuilder; + +import java.util.Map; + +public class FeatureIndexBuilderJobPersistentTasksExecutor extends PersistentTasksExecutor { + private final Client client; + private final SchedulerEngine schedulerEngine; + private final ThreadPool threadPool; + + public FeatureIndexBuilderJobPersistentTasksExecutor(Settings settings, Client client, SchedulerEngine schedulerEngine, + ThreadPool threadPool) { + super(settings, "xpack/feature_index_builder/job", FeatureIndexBuilder.TASK_THREAD_POOL_NAME); + this.client = client; + this.schedulerEngine = schedulerEngine; + this.threadPool = threadPool; + } + + @Override + protected void nodeOperation(AllocatedPersistentTask task, @Nullable FeatureIndexBuilderJob params, PersistentTaskState state) { + FeatureIndexBuilderJobTask buildTask = (FeatureIndexBuilderJobTask) task; + SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job( + FeatureIndexBuilderJobTask.SCHEDULE_NAME + "_" + params.getConfig().getId(), next()); + + // Note that while the task is added to the scheduler here, the internal state + // will prevent + // it from doing any work until the task is "started" via the StartJob api + schedulerEngine.register(buildTask); + schedulerEngine.add(schedulerJob); + + logger.info("FeatureIndexBuilder job [" + params.getConfig().getId() + "] created."); + } + + static SchedulerEngine.Schedule next() { + return (startTime, now) -> { + return now + 1000; // to be fixed, hardcode something + }; + } + + @Override + protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, + PersistentTasksCustomMetaData.PersistentTask persistentTask, Map headers) { + return new FeatureIndexBuilderJobTask(id, type, action, parentTaskId, persistentTask.getParams(), + (FeatureIndexBuilderJobState) persistentTask.getState(), client, schedulerEngine, threadPool, headers); + } +} \ No newline at end of file diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobStats.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobStats.java new file mode 100644 index 0000000000000..a7c9392800f09 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobStats.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.job; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.indexing.IndexerJobStats; + +import java.io.IOException; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; + +public class FeatureIndexBuilderJobStats extends IndexerJobStats { + private static ParseField NUM_PAGES = new ParseField("pages_processed"); + private static ParseField NUM_INPUT_DOCUMENTS = new ParseField("documents_processed"); + private static ParseField NUM_OUTPUT_DOCUMENTS = new ParseField("documents_indexed"); + private static ParseField NUM_INVOCATIONS = new ParseField("trigger_count"); + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + NAME.getPreferredName(), + args -> new FeatureIndexBuilderJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3])); + + static { + PARSER.declareLong(constructorArg(), NUM_PAGES); + PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS); + PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS); + PARSER.declareLong(constructorArg(), NUM_INVOCATIONS); + } + + public FeatureIndexBuilderJobStats() { + super(); + } + + public FeatureIndexBuilderJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations) { + super(numPages, numInputDocuments, numOuputDocuments, numInvocations); + } + + public FeatureIndexBuilderJobStats(StreamInput in) throws IOException { + super(in); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(NUM_PAGES.getPreferredName(), numPages); + builder.field(NUM_INPUT_DOCUMENTS.getPreferredName(), numInputDocuments); + builder.field(NUM_OUTPUT_DOCUMENTS.getPreferredName(), numOuputDocuments); + builder.field(NUM_INVOCATIONS.getPreferredName(), numInvocations); + builder.endObject(); + return builder; + } + + public static FeatureIndexBuilderJobStats fromXContent(XContentParser parser) { + try { + return PARSER.parse(parser, null); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java index a4de3927e5bc1..381e57e9027ca 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java @@ -8,81 +8,47 @@ import org.apache.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.AllocatedPersistentTask; -import org.elasticsearch.persistent.PersistentTaskState; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event; -import org.elasticsearch.xpack.ml.featureindexbuilder.FeatureIndexBuilder; import org.elasticsearch.xpack.ml.featureindexbuilder.action.StartFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.action.StartFeatureIndexBuilderJobAction.Response; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; public class FeatureIndexBuilderJobTask extends AllocatedPersistentTask implements SchedulerEngine.Listener { private static final Logger logger = Logger.getLogger(FeatureIndexBuilderJobTask.class.getName()); + + private final FeatureIndexBuilderJob job; + private final ThreadPool threadPool; private final FeatureIndexBuilderIndexer indexer; static final String SCHEDULE_NAME = "xpack/feature_index_builder/job" + "/schedule"; - public static class FeatureIndexBuilderJobPersistentTasksExecutor extends PersistentTasksExecutor { - private final Client client; - private final SchedulerEngine schedulerEngine; - private final ThreadPool threadPool; - - public FeatureIndexBuilderJobPersistentTasksExecutor(Settings settings, Client client, SchedulerEngine schedulerEngine, - ThreadPool threadPool) { - super(settings, "xpack/feature_index_builder/job", FeatureIndexBuilder.TASK_THREAD_POOL_NAME); - this.client = client; - this.schedulerEngine = schedulerEngine; - this.threadPool = threadPool; - } - - @Override - protected void nodeOperation(AllocatedPersistentTask task, @Nullable FeatureIndexBuilderJob params, PersistentTaskState state) { - FeatureIndexBuilderJobTask buildTask = (FeatureIndexBuilderJobTask) task; - SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(SCHEDULE_NAME + "_" + params.getConfig().getId(), next()); - - // Note that while the task is added to the scheduler here, the internal state - // will prevent - // it from doing any work until the task is "started" via the StartJob api - schedulerEngine.register(buildTask); - schedulerEngine.add(schedulerJob); - - logger.info("FeatureIndexBuilder job [" + params.getConfig().getId() + "] created."); - } - - static SchedulerEngine.Schedule next() { - return (startTime, now) -> { - return now + 1000; // to be fixed, hardcode something - }; - } - - @Override - protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, - PersistentTasksCustomMetaData.PersistentTask persistentTask, Map headers) { - return new FeatureIndexBuilderJobTask(id, type, action, parentTaskId, persistentTask.getParams(), - (FeatureIndexBuilderJobState) persistentTask.getState(), client, schedulerEngine, threadPool, headers); - } - } - - private final FeatureIndexBuilderJob job; - public FeatureIndexBuilderJobTask(long id, String type, String action, TaskId parentTask, FeatureIndexBuilderJob job, FeatureIndexBuilderJobState state, Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool, Map headers) { super(id, type, action, "" + "_" + job.getConfig().getId(), parentTask, headers); this.job = job; + this.threadPool = threadPool; logger.info("construct job task"); // todo: simplistic implementation for now - this.indexer = new FeatureIndexBuilderIndexer(job, client); + IndexerState initialState = IndexerState.STOPPED; + Map initialPosition = null; + this.indexer = new ClientFeatureIndexBuilderIndexer(job, new AtomicReference<>(initialState), initialPosition, client); } public FeatureIndexBuilderJobConfig getConfig() { @@ -96,6 +62,66 @@ public synchronized void start(ActionListener listener) { @Override public void triggered(Event event) { + if (event.getJobName().equals(SCHEDULE_NAME + "_" + job.getConfig().getId())) { + logger.debug( + "FeatureIndexBuilder indexer [" + event.getJobName() + "] schedule has triggered, state: [" + indexer.getState() + "]"); + indexer.maybeTriggerAsyncJob(System.currentTimeMillis()); + } } + protected class ClientFeatureIndexBuilderIndexer extends FeatureIndexBuilderIndexer { + private final Client client; + + public ClientFeatureIndexBuilderIndexer(FeatureIndexBuilderJob job, AtomicReference initialState, + Map initialPosition, Client client) { + super(threadPool.executor(ThreadPool.Names.GENERIC), job, initialState, initialPosition); + this.client = client; + } + + @Override + protected void doNextSearch(SearchRequest request, ActionListener nextPhase) { + ClientHelper.executeWithHeadersAsync(job.getHeaders(), ClientHelper.ML_ORIGIN, client, SearchAction.INSTANCE, request, + nextPhase); + } + + @Override + protected void doNextBulk(BulkRequest request, ActionListener nextPhase) { + ClientHelper.executeWithHeadersAsync(job.getHeaders(), ClientHelper.ML_ORIGIN, client, BulkAction.INSTANCE, request, nextPhase); + } + + @Override + protected void doSaveState(IndexerState indexerState, Map position, Runnable next) { + if (indexerState.equals(IndexerState.ABORTING)) { + // If we're aborting, just invoke `next` (which is likely an onFailure handler) + next.run(); + } else { + // to be implemented + + final FeatureIndexBuilderJobState state = new FeatureIndexBuilderJobState(indexerState); + logger.info("Updating persistent state of job [" + job.getConfig().getId() + "] to [" + state.toString() + "]"); + + // TODO: we can not persist the state right now, need to be called from the task + updatePersistentTaskState(state, ActionListener.wrap(task -> next.run(), exc -> { + // We failed to update the persistent task for some reason, + // set our flag back to what it was before + next.run(); + })); + } + } + + @Override + protected void onFailure(Exception exc) { + logger.warn("FeatureIndexBuilder job [" + job.getConfig().getId() + "] failed with an exception: ", exc); + } + + @Override + protected void onFinish() { + logger.info("Finished indexing for job [" + job.getConfig().getId() + "]"); + } + + @Override + protected void onAbort() { + logger.info("FeatureIndexBuilder job [" + job.getConfig().getId() + "] received abort request, stopping indexer"); + } + } }