Skip to content

Commit

Permalink
backport 936faba to 6.x (#35045)
Browse files Browse the repository at this point in the history
* backport 936faba to 6.x

* address code review comments
  • Loading branch information
Paul Sanwald authored Oct 31, 2018
1 parent 62f8da2 commit 362ab15
Show file tree
Hide file tree
Showing 12 changed files with 425 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.client;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.rollup.DeleteRollupJobRequest;
import org.elasticsearch.client.rollup.DeleteRollupJobResponse;
import org.elasticsearch.client.rollup.GetRollupCapsRequest;
import org.elasticsearch.client.rollup.GetRollupCapsResponse;
import org.elasticsearch.client.rollup.GetRollupJobRequest;
Expand Down Expand Up @@ -114,6 +116,40 @@ public void startRollupJobAsync(StartRollupJobRequest request, RequestOptions op
listener, Collections.emptySet());
}

/**
* Delete a rollup job from the cluster
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/rollup-delete-job.html">
* the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public DeleteRollupJobResponse deleteRollupJob(DeleteRollupJobRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
RollupRequestConverters::deleteJob,
options,
DeleteRollupJobResponse::fromXContent,
Collections.emptySet());
}
/**
* Asynchronously delete a rollup job from the cluster
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/rollup-delete-job.html">
* The docs</a> for details.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public void deleteRollupJobAsync(DeleteRollupJobRequest request,
RequestOptions options,
ActionListener<DeleteRollupJobResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
RollupRequestConverters::deleteJob,
options,
DeleteRollupJobResponse::fromXContent,
listener, Collections.emptySet());
}

/**
* Get a rollup job from the cluster.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/rollup-put-job.html">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
*/
package org.elasticsearch.client;

import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.rollup.GetRollupCapsRequest;
import org.elasticsearch.client.rollup.DeleteRollupJobRequest;
import org.elasticsearch.client.rollup.GetRollupJobRequest;
import org.elasticsearch.client.rollup.PutRollupJobRequest;
import org.elasticsearch.client.rollup.StartRollupJobRequest;
Expand Down Expand Up @@ -74,4 +76,16 @@ static Request getRollupCaps(final GetRollupCapsRequest getRollupCapsRequest) th
request.setEntity(createEntity(getRollupCapsRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request deleteJob(final DeleteRollupJobRequest deleteRollupJobRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_xpack")
.addPathPartAsIs("rollup")
.addPathPartAsIs("job")
.addPathPart(deleteRollupJobRequest.getId())
.build();
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
request.setEntity(createEntity(deleteRollupJobRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;

public abstract class AcknowledgedResponse implements ToXContentObject {
private final boolean acknowledged;

protected static final String PARSE_FIELD_NAME = "acknowledged";
private final boolean acknowledged;

public AcknowledgedResponse(final boolean acknowledged) {
this.acknowledged = acknowledged;
Expand Down Expand Up @@ -83,4 +83,5 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
protected String getFieldName() {
return PARSE_FIELD_NAME;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.rollup;

import org.elasticsearch.client.Validatable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;


public class DeleteRollupJobRequest implements Validatable, ToXContentObject {

private static final ParseField ID_FIELD = new ParseField("id");
private final String id;


public DeleteRollupJobRequest(String id) {
this.id = Objects.requireNonNull(id, "id parameter must not be null");
}

public String getId() {
return id;
}

private static final ConstructingObjectParser<DeleteRollupJobRequest, Void> PARSER =
new ConstructingObjectParser<>("request", a -> {
return new DeleteRollupJobRequest((String) a[0]);
});

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD);
}

public static DeleteRollupJobRequest fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ID_FIELD.getPreferredName(), this.id);
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DeleteRollupJobRequest that = (DeleteRollupJobRequest) o;
return Objects.equals(id, that.id);
}

@Override
public int hashCode() {
return Objects.hash(id);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client.rollup;

import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;

public class DeleteRollupJobResponse extends AcknowledgedResponse {

public DeleteRollupJobResponse(boolean acknowledged) {
super(acknowledged);
}

private static final ConstructingObjectParser<DeleteRollupJobResponse, Void> PARSER = AcknowledgedResponse
.generateParser("delete_rollup_job_response", DeleteRollupJobResponse::new, AcknowledgedResponse.PARSE_FIELD_NAME);

public static DeleteRollupJobResponse fromXContent(final XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@

import java.io.IOException;


public class PutRollupJobResponse extends AcknowledgedResponse {


public PutRollupJobResponse(boolean acknowledged) {
super(acknowledged);
}

private static final ConstructingObjectParser<PutRollupJobResponse, Void> PARSER = AcknowledgedResponse
.generateParser("delete_rollup_job_response", PutRollupJobResponse::new, AcknowledgedResponse.PARSE_FIELD_NAME);
.generateParser("delete_rollup_job_response", PutRollupJobResponse::new, AcknowledgedResponse.PARSE_FIELD_NAME);

public static PutRollupJobResponse fromXContent(final XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.client;

import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
Expand All @@ -34,6 +35,8 @@
import org.elasticsearch.client.rollup.GetRollupJobResponse;
import org.elasticsearch.client.rollup.GetRollupJobResponse.IndexerState;
import org.elasticsearch.client.rollup.GetRollupJobResponse.JobWrapper;
import org.elasticsearch.client.rollup.DeleteRollupJobRequest;
import org.elasticsearch.client.rollup.DeleteRollupJobResponse;
import org.elasticsearch.client.rollup.PutRollupJobRequest;
import org.elasticsearch.client.rollup.PutRollupJobResponse;
import org.elasticsearch.client.rollup.RollableIndexCaps;
Expand All @@ -53,6 +56,7 @@
import org.elasticsearch.search.aggregations.metrics.min.MinAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountAggregationBuilder;
import org.junit.Before;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -69,18 +73,35 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;

public class RollupIT extends ESRestHighLevelClientTestCase {

double sum = 0.0d;
int max = Integer.MIN_VALUE;
int min = Integer.MAX_VALUE;
private static final List<String> SUPPORTED_METRICS = Arrays.asList(MaxAggregationBuilder.NAME, MinAggregationBuilder.NAME,
SumAggregationBuilder.NAME, AvgAggregationBuilder.NAME, ValueCountAggregationBuilder.NAME);

public void testPutStartAndGetRollupJob() throws Exception {
double sum = 0.0d;
int max = Integer.MIN_VALUE;
int min = Integer.MAX_VALUE;
SumAggregationBuilder.NAME, AvgAggregationBuilder.NAME, ValueCountAggregationBuilder.NAME);

private String id;
private String indexPattern;
private String rollupIndex;
private String cron;
private int pageSize;
private int numDocs;

@Before
public void init() throws Exception {
id = randomAlphaOfLength(10);
indexPattern = randomFrom("docs", "d*", "doc*");
rollupIndex = randomFrom("rollup", "test");
cron = "*/1 * * * * ?";
numDocs = indexDocs();
pageSize = randomIntBetween(numDocs, numDocs * 10);
}

public int indexDocs() throws Exception {
final BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int minute = 0; minute < 60; minute++) {
Expand Down Expand Up @@ -120,12 +141,33 @@ public void testPutStartAndGetRollupJob() throws Exception {

RefreshResponse refreshResponse = highLevelClient().indices().refresh(new RefreshRequest("docs"), RequestOptions.DEFAULT);
assertEquals(0, refreshResponse.getFailedShards());
return numDocs;
}

final String id = randomAlphaOfLength(10);
final String indexPattern = randomFrom("docs", "d*", "doc*");
final String rollupIndex = randomFrom("rollup", "test");
final String cron = "*/1 * * * * ?";
final int pageSize = randomIntBetween(numDocs, numDocs * 10);

public void testDeleteRollupJob() throws Exception {
final GroupConfig groups = new GroupConfig(new DateHistogramGroupConfig("date", DateHistogramInterval.DAY));
final List<MetricConfig> metrics = Collections.singletonList(new MetricConfig("value", SUPPORTED_METRICS));
final TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(30, 600));
PutRollupJobRequest putRollupJobRequest =
new PutRollupJobRequest(new RollupJobConfig(id, indexPattern, rollupIndex, cron, pageSize, groups, metrics, timeout));
final RollupClient rollupClient = highLevelClient().rollup();
PutRollupJobResponse response = execute(putRollupJobRequest, rollupClient::putRollupJob, rollupClient::putRollupJobAsync);
DeleteRollupJobRequest deleteRollupJobRequest = new DeleteRollupJobRequest(id);
DeleteRollupJobResponse deleteRollupJobResponse = highLevelClient().rollup()
.deleteRollupJob(deleteRollupJobRequest, RequestOptions.DEFAULT);
assertTrue(deleteRollupJobResponse.isAcknowledged());
}

public void testDeleteMissingRollupJob() {
DeleteRollupJobRequest deleteRollupJobRequest = new DeleteRollupJobRequest(randomAlphaOfLength(10));
ElasticsearchStatusException responseException = expectThrows(ElasticsearchStatusException.class,() -> highLevelClient().rollup()
.deleteRollupJob(deleteRollupJobRequest, RequestOptions.DEFAULT));
assertThat(responseException.status().getStatus(), is(404));
}

@SuppressWarnings("unchecked")
public void testPutAndGetRollupJob() throws Exception {
// TODO expand this to also test with histogram and terms?
final GroupConfig groups = new GroupConfig(new DateHistogramGroupConfig("date", DateHistogramInterval.DAY));
final List<MetricConfig> metrics = Collections.singletonList(new MetricConfig("value", SUPPORTED_METRICS));
Expand All @@ -142,9 +184,6 @@ public void testPutStartAndGetRollupJob() throws Exception {
StartRollupJobResponse startResponse = execute(startRequest, rollupClient::startRollupJob, rollupClient::startRollupJobAsync);
assertTrue(startResponse.isAcknowledged());

int finalMin = min;
int finalMax = max;
double finalSum = sum;
assertBusy(() -> {
SearchResponse searchResponse = highLevelClient().search(new SearchRequest(rollupIndex), RequestOptions.DEFAULT);
assertEquals(0, searchResponse.getFailedShards());
Expand All @@ -162,13 +201,13 @@ public void testPutStartAndGetRollupJob() throws Exception {
for (String name : metric.getMetrics()) {
Number value = (Number) source.get(metric.getField() + "." + name + ".value");
if ("min".equals(name)) {
assertEquals(finalMin, value.intValue());
assertEquals(min, value.intValue());
} else if ("max".equals(name)) {
assertEquals(finalMax, value.intValue());
assertEquals(max, value.intValue());
} else if ("sum".equals(name)) {
assertEquals(finalSum, value.doubleValue(), 0.0d);
assertEquals(sum, value.doubleValue(), 0.0d);
} else if ("avg".equals(name)) {
assertEquals(finalSum, value.doubleValue(), 0.0d);
assertEquals(sum, value.doubleValue(), 0.0d);
Number avgCount = (Number) source.get(metric.getField() + "." + name + "._count");
assertEquals(numDocs, avgCount.intValue());
} else if ("value_count".equals(name)) {
Expand Down
Loading

0 comments on commit 362ab15

Please sign in to comment.