Skip to content

Commit

Permalink
[ML-Dataframe] Use AsyncTwoPhaseIndexer (#33504)
Browse files Browse the repository at this point in the history
Replace mocked indexer and use AsyncTwoPhaseIndexer (introduced in #32743) instead.
  • Loading branch information
Hendrik Muhs authored Sep 10, 2018
1 parent b2195fb commit 4946d33
Show file tree
Hide file tree
Showing 7 changed files with 312 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportPutFeatureIndexBuilderJobAction;
import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportStartFeatureIndexBuilderJobAction;
import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJob;
import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobTask;
import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobPersistentTasksExecutor;
import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestPutFeatureIndexBuilderJobAction;
import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestStartFeatureIndexBuilderJobAction;

Expand Down Expand Up @@ -127,7 +127,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
}

SchedulerEngine schedulerEngine = new SchedulerEngine(settings, Clock.systemUTC());
return Collections.singletonList(new FeatureIndexBuilderJobTask.FeatureIndexBuilderJobPersistentTasksExecutor(settings, client,
return Collections.singletonList(new FeatureIndexBuilderJobPersistentTasksExecutor(settings, client,
schedulerEngine, threadPool));
}
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@
package org.elasticsearch.xpack.ml.featureindexbuilder.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.PersistentTasksService;
Expand All @@ -29,8 +32,14 @@
import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJob;
import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobConfig;

import static org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings.DOC_TYPE;

public class TransportPutFeatureIndexBuilderJobAction
extends TransportMasterNodeAction<PutFeatureIndexBuilderJobAction.Request, PutFeatureIndexBuilderJobAction.Response> {

// TODO: hack, to be replaced
private static final String PIVOT_INDEX = "pivot-reviews";

private final XPackLicenseState licenseState;
private final PersistentTasksService persistentTasksService;
private final Client client;
Expand Down Expand Up @@ -67,7 +76,7 @@ protected void masterOperation(Request request, ClusterState clusterState, Actio
XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);

FeatureIndexBuilderJob job = createFeatureIndexBuilderJob(request.getConfig(), threadPool);

createIndex(client, job.getConfig().getId());
startPersistentTask(job, listener, persistentTasksService);
}

Expand All @@ -90,4 +99,34 @@ static void startPersistentTask(FeatureIndexBuilderJob job, ActionListener<PutFe
protected ClusterBlockException checkBlock(PutFeatureIndexBuilderJobAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

/*
* Mocked demo case
*
* TODO: everything below will be replaced with proper implementation read from job configuration
*/
private static void createIndex(Client client, String suffix) {

String indexName = PIVOT_INDEX + "_" + suffix;
CreateIndexRequest request = new CreateIndexRequest(indexName);

request.settings(Settings.builder() // <1>
.put("index.number_of_shards", 1).put("index.number_of_replicas", 0));
request.mapping(DOC_TYPE, // <1>
"{\n" +
" \"" + DOC_TYPE + "\": {\n" +
" \"properties\": {\n" +
" \"reviewerId\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"avg_rating\": {\n" +
" \"type\": \"integer\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}", // <2>
XContentType.JSON);
IndicesAdminClient adminClient = client.admin().indices();
adminClient.create(request).actionGet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,188 +7,125 @@
package org.elasticsearch.xpack.ml.featureindexbuilder.job;

import org.apache.log4j.Logger;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation.Bucket;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.indexing.IterationResult;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings.DOC_TYPE;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

public class FeatureIndexBuilderIndexer {
public abstract class FeatureIndexBuilderIndexer extends AsyncTwoPhaseIndexer<Map<String, Object>, FeatureIndexBuilderJobStats> {
private static final String PIVOT_INDEX = "pivot-reviews";
private static final String SOURCE_INDEX = "anonreviews";

private static final Logger logger = Logger.getLogger(FeatureIndexBuilderIndexer.class.getName());
private FeatureIndexBuilderJob job;
private Client client;

public FeatureIndexBuilderIndexer(FeatureIndexBuilderJob job, Client client) {
public FeatureIndexBuilderIndexer(Executor executor, FeatureIndexBuilderJob job, AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition) {
super(executor, initialState, initialPosition, new FeatureIndexBuilderJobStats());

this.job = job;
this.client = client;
logger.info("delete pivot-reviews");

}

public synchronized void start() {
deleteIndex(client);

createIndex(client);

int runs = 0;

Map<String, Object> after = null;
logger.info("start feature indexing");
SearchResponse response;

try {
response = runQuery(client, after);

CompositeAggregation compositeAggregation = response.getAggregations().get("feature");
after = compositeAggregation.afterKey();

while (after != null) {
indexBuckets(compositeAggregation);

++runs;
response = runQuery(client, after);

compositeAggregation = response.getAggregations().get("feature");
after = compositeAggregation.afterKey();

//after = null;
}

indexBuckets(compositeAggregation);
} catch (InterruptedException | ExecutionException e) {
logger.error("Failed to build feature index", e);
}

logger.info("Finished feature indexing");
@Override
protected String getJobId() {
return job.getConfig().getId();
}

private void indexBuckets(CompositeAggregation compositeAggregation) {
BulkRequest bulkIndexRequest = new BulkRequest();
try {
for (Bucket b : compositeAggregation.getBuckets()) {
@Override
protected void onStartJob(long now) {
}

InternalAvg avgAgg = b.getAggregations().get("avg_rating");
@Override
protected IterationResult<Map<String, Object>> doProcess(SearchResponse searchResponse) {
final CompositeAggregation agg = searchResponse.getAggregations().get("feature");
return new IterationResult<>(processBuckets(agg), agg.afterKey(), agg.getBuckets().isEmpty());
}

XContentBuilder builder;
/*
* Mocked demo case
*
* TODO: replace with proper implementation
*/
private List<IndexRequest> processBuckets(CompositeAggregation agg) {
return agg.getBuckets().stream().map(b -> {
InternalAvg avgAgg = b.getAggregations().get("avg_rating");
XContentBuilder builder;
try {
builder = jsonBuilder();

builder.startObject();
builder.field("reviewerId", b.getKey().get("reviewerId"));
builder.field("avg_rating", avgAgg.getValue());
builder.endObject();
bulkIndexRequest.add(new IndexRequest(PIVOT_INDEX, DOC_TYPE).source(builder));

} catch (IOException e) {
throw new UncheckedIOException(e);
}
client.bulk(bulkIndexRequest);
} catch (IOException e) {
logger.error("Failed to index", e);
}

String indexName = PIVOT_INDEX + "_" + job.getConfig().getId();
IndexRequest request = new IndexRequest(indexName, DOC_TYPE).source(builder);
return request;
}).collect(Collectors.toList());
}

@Override
protected SearchRequest buildSearchRequest() {

final Map<String, Object> position = getPosition();
SearchRequest request = buildFeatureQuery(position);
return request;
}

/*
* Hardcoded demo case for pivoting
* Mocked demo case
*
* TODO: everything below will be replaced with proper implementation read from job configuration
*/

private static void deleteIndex(Client client) {
DeleteIndexRequest deleteIndex = new DeleteIndexRequest(PIVOT_INDEX);

IndicesAdminClient adminClient = client.admin().indices();
try {
adminClient.delete(deleteIndex).actionGet();
} catch (IndexNotFoundException e) {
}
}

private static void createIndex(Client client) {

CreateIndexRequest request = new CreateIndexRequest(PIVOT_INDEX);
request.settings(Settings.builder() // <1>
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
);
request.mapping(DOC_TYPE, // <1>
"{\n" +
" \"" + DOC_TYPE + "\": {\n" +
" \"properties\": {\n" +
" \"reviewerId\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"avg_rating\": {\n" +
" \"type\": \"integer\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}", // <2>
XContentType.JSON);
IndicesAdminClient adminClient = client.admin().indices();
adminClient.create(request).actionGet();
}

private static SearchRequest buildFeatureQuery(Map<String, Object> after) {
QueryBuilder queryBuilder = new MatchAllQueryBuilder();
SearchRequest searchRequest = new SearchRequest(SOURCE_INDEX);

List<CompositeValuesSourceBuilder<?>> sources = new ArrayList<>();
sources.add(new TermsValuesSourceBuilder("reviewerId").field("reviewerId"));

CompositeAggregationBuilder compositeAggregation = new CompositeAggregationBuilder("feature", sources);
compositeAggregation.size(1000);

if (after != null) {
compositeAggregation.aggregateAfter(after);
}

compositeAggregation.subAggregation(AggregationBuilders.avg("avg_rating").field("rating"));
compositeAggregation.subAggregation(AggregationBuilders.cardinality("dc_vendors").field("vendorId"));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.aggregation(compositeAggregation);
sourceBuilder.size(0);
sourceBuilder.query(queryBuilder);
searchRequest.source(sourceBuilder);

return searchRequest;
}

private static SearchResponse runQuery(Client client, Map<String, Object> after) throws InterruptedException, ExecutionException {

SearchRequest request = buildFeatureQuery(after);
SearchResponse response = client.search(request).get();

return response;
}

private static void indexResult() {



}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.elasticsearch.xpack.core.XPackPlugin;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

public class FeatureIndexBuilderJob implements XPackPlugin.XPackPersistentTaskParams {
Expand Down Expand Up @@ -92,4 +94,8 @@ public boolean equals(Object other) {
public int hashCode() {
return Objects.hash(config);
}

public Map<String, String> getHeaders() {
return Collections.emptyMap();
}
}
Loading

0 comments on commit 4946d33

Please sign in to comment.