Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REST high-level client: add support for Rollover Index API #28698

Merged
merged 15 commits into from
Feb 20, 2018
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.action.admin.indices.shrink.ResizeRequest;
import org.elasticsearch.action.admin.indices.shrink.ResizeResponse;

Expand Down Expand Up @@ -272,7 +274,7 @@ public void shrinkAsync(ResizeRequest resizeRequest, ActionListener<ResizeRespon
* Splits an index using the Split Index API
* <p>
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-split-index.html">
* Shrink Index API on elastic.co</a>
* Split Index API on elastic.co</a>
*/
public ResizeResponse split(ResizeRequest resizeRequest, Header... headers) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(resizeRequest, Request::split, ResizeResponse::fromXContent,
Expand All @@ -289,4 +291,26 @@ public void splitAsync(ResizeRequest resizeRequest, ActionListener<ResizeRespons
restHighLevelClient.performRequestAsyncAndParseEntity(resizeRequest, Request::split, ResizeResponse::fromXContent,
listener, emptySet(), headers);
}

/**
* Rolls over an index using the Rollover Index API
* <p>
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-rollover-index.html">
* Rollover Index API on elastic.co</a>
*/
public RolloverResponse rollover(RolloverRequest rolloverRequest, Header... headers) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(rolloverRequest, Request::rollover, RolloverResponse::fromXContent,
emptySet(), headers);
}

/**
* Asynchronously rolls over an index using the Rollover Index API
* <p>
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-rollover-index.html">
* Rollover Index API on elastic.co</a>
*/
public void rolloverAsync(RolloverRequest rolloverRequest, ActionListener<RolloverResponse> listener, Header... headers) {
restHighLevelClient.performRequestAsyncAndParseEntity(rolloverRequest, Request::rollover, RolloverResponse::fromXContent,
listener, emptySet(), headers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.shrink.ResizeRequest;
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
import org.elasticsearch.action.bulk.BulkRequest;
Expand Down Expand Up @@ -498,11 +499,10 @@ static Request existsAlias(GetAliasesRequest getAliasesRequest) {
}

static Request rankEval(RankEvalRequest rankEvalRequest) throws IOException {
// TODO maybe indices should be propery of RankEvalRequest and not of the spec
// TODO maybe indices should be property of RankEvalRequest and not of the spec
List<String> indices = rankEvalRequest.getRankEvalSpec().getIndices();
String endpoint = endpoint(indices.toArray(new String[indices.size()]), Strings.EMPTY_ARRAY, "_rank_eval");
HttpEntity entity = null;
entity = createEntity(rankEvalRequest.getRankEvalSpec(), REQUEST_BODY_CONTENT_TYPE);
HttpEntity entity = createEntity(rankEvalRequest.getRankEvalSpec(), REQUEST_BODY_CONTENT_TYPE);
return new Request(HttpGet.METHOD_NAME, endpoint, Collections.emptyMap(), entity);
}

Expand Down Expand Up @@ -542,6 +542,19 @@ static Request clusterPutSettings(ClusterUpdateSettingsRequest clusterUpdateSett
return new Request(HttpPut.METHOD_NAME, endpoint, parameters.getParams(), entity);
}

static Request rollover(RolloverRequest rolloverRequest) throws IOException {
Params params = Params.builder();
params.withTimeout(rolloverRequest.timeout());
params.withMasterTimeout(rolloverRequest.masterNodeTimeout());
params.withWaitForActiveShards(rolloverRequest.getCreateIndexRequest().waitForActiveShards());
if (rolloverRequest.isDryRun()) {
params.putParam("dry_run", Boolean.TRUE.toString());
}
String endpoint = buildEndpoint(rolloverRequest.getAlias(), "_rollover", rolloverRequest.getNewIndexName());
HttpEntity entity = createEntity(rolloverRequest, REQUEST_BODY_CONTENT_TYPE);
return new Request(HttpPost.METHOD_NAME, endpoint, params.getParams(), entity);
}

private static HttpEntity createEntity(ToXContent toXContent, XContentType xContentType) throws IOException {
BytesRef source = XContentHelper.toXContent(toXContent, xContentType, false).toBytesRef();
return new ByteArrayEntity(source.bytes, source.offset, source.length, createContentType(xContentType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,18 @@
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.action.admin.indices.shrink.ResizeRequest;
import org.elasticsearch.action.admin.indices.shrink.ResizeResponse;
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
Expand Down Expand Up @@ -435,4 +442,57 @@ public void testSplit() throws IOException {
Map<String, Object> aliasData = (Map<String, Object>)XContentMapValues.extractValue("target.aliases.alias", getIndexResponse);
assertNotNull(aliasData);
}

public void testRollover() throws IOException {
highLevelClient().indices().create(new CreateIndexRequest("test").alias(new Alias("alias")));
RolloverRequest rolloverRequest = new RolloverRequest("alias", "test_new");
rolloverRequest.addMaxIndexDocsCondition(1);

{
RolloverResponse rolloverResponse = execute(rolloverRequest, highLevelClient().indices()::rollover,
highLevelClient().indices()::rolloverAsync);
assertFalse(rolloverResponse.isRolledOver());
assertFalse(rolloverResponse.isDryRun());
Map<String, Boolean> conditionStatus = rolloverResponse.getConditionStatus();
assertEquals(1, conditionStatus.size());
assertFalse(conditionStatus.get("[max_docs: 1]"));
assertEquals("test", rolloverResponse.getOldIndex());
assertEquals("test_new", rolloverResponse.getNewIndex());
}

highLevelClient().index(new IndexRequest("test", "type", "1").source("field", "value"));
highLevelClient().index(new IndexRequest("test", "type", "2").source("field", "value")
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious why we need the refresh policy here. Maybe worth a comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rollover API doesn't see the proper number of docs otherwise. At least that's why I think the test fails without it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add this explanation as a short comment then? Thanks.

//without the refresh the rollover may not happen as the number of docs seen may be off
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks


{
rolloverRequest.addMaxIndexAgeCondition(new TimeValue(1));
rolloverRequest.dryRun(true);
RolloverResponse rolloverResponse = execute(rolloverRequest, highLevelClient().indices()::rollover,
highLevelClient().indices()::rolloverAsync);
assertFalse(rolloverResponse.isRolledOver());
assertTrue(rolloverResponse.isDryRun());
Map<String, Boolean> conditionStatus = rolloverResponse.getConditionStatus();
assertEquals(2, conditionStatus.size());
assertTrue(conditionStatus.get("[max_docs: 1]"));
assertTrue(conditionStatus.get("[max_age: 1ms]"));
assertEquals("test", rolloverResponse.getOldIndex());
assertEquals("test_new", rolloverResponse.getNewIndex());
}
{
rolloverRequest.dryRun(false);
rolloverRequest.addMaxIndexSizeCondition(new ByteSizeValue(1, ByteSizeUnit.MB));
RolloverResponse rolloverResponse = execute(rolloverRequest, highLevelClient().indices()::rollover,
highLevelClient().indices()::rolloverAsync);
assertTrue(rolloverResponse.isRolledOver());
assertFalse(rolloverResponse.isDryRun());
Map<String, Boolean> conditionStatus = rolloverResponse.getConditionStatus();
assertEquals(3, conditionStatus.size());
assertTrue(conditionStatus.get("[max_docs: 1]"));
assertTrue(conditionStatus.get("[max_age: 1ms]"));
assertFalse(conditionStatus.get("[max_size: 1mb]"));
assertEquals("test", rolloverResponse.getOldIndex());
assertEquals("test_new", rolloverResponse.getNewIndex());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.shrink.ResizeRequest;
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
import org.elasticsearch.action.bulk.BulkRequest;
Expand Down Expand Up @@ -74,6 +75,7 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.RandomCreateIndexGenerator;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.rankeval.PrecisionAtK;
Expand Down Expand Up @@ -1116,11 +1118,9 @@ private static void resizeTest(ResizeType resizeType, CheckedFunction<ResizeRequ
if (randomBoolean()) {
randomAliases(createIndexRequest);
}
setRandomWaitForActiveShards(createIndexRequest::waitForActiveShards, expectedParams);
resizeRequest.setTargetIndex(createIndexRequest);
} else {
setRandomWaitForActiveShards(resizeRequest::setWaitForActiveShards, expectedParams);
}
setRandomWaitForActiveShards(resizeRequest::setWaitForActiveShards, expectedParams);

Request request = function.apply(resizeRequest);
assertEquals(HttpPut.METHOD_NAME, request.getMethod());
Expand All @@ -1144,6 +1144,44 @@ public void testClusterPutSettings() throws IOException {
assertEquals(expectedParams, expectedRequest.getParameters());
}

public void testRollover() throws IOException {
RolloverRequest rolloverRequest = new RolloverRequest(randomAlphaOfLengthBetween(3, 10),
randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10));
Map<String, String> expectedParams = new HashMap<>();
setRandomTimeout(rolloverRequest::timeout, rolloverRequest.timeout(), expectedParams);
setRandomMasterTimeout(rolloverRequest, expectedParams);
if (randomBoolean()) {
rolloverRequest.dryRun(randomBoolean());
if (rolloverRequest.isDryRun()) {
expectedParams.put("dry_run", "true");
}
}
if (randomBoolean()) {
rolloverRequest.addMaxIndexAgeCondition(new TimeValue(randomNonNegativeLong()));
}
if (randomBoolean()) {
String type = randomAlphaOfLengthBetween(3, 10);
rolloverRequest.getCreateIndexRequest().mapping(type, RandomCreateIndexGenerator.randomMapping(type));
}
if (randomBoolean()) {
RandomCreateIndexGenerator.randomAliases(rolloverRequest.getCreateIndexRequest());
}
if (randomBoolean()) {
rolloverRequest.getCreateIndexRequest().settings(RandomCreateIndexGenerator.randomIndexSettings());
}
setRandomWaitForActiveShards(rolloverRequest.getCreateIndexRequest()::waitForActiveShards, expectedParams);

Request request = Request.rollover(rolloverRequest);
if (rolloverRequest.getNewIndexName() == null) {
assertEquals("/" + rolloverRequest.getAlias() + "/_rollover", request.getEndpoint());
} else {
assertEquals("/" + rolloverRequest.getAlias() + "/_rollover/" + rolloverRequest.getNewIndexName(), request.getEndpoint());
}
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertToXContentBody(rolloverRequest, request.getEntity());
assertEquals(expectedParams, request.getParameters());
}

private static void assertToXContentBody(ToXContent expectedBody, HttpEntity actualEntity) throws IOException {
BytesReference expectedBytes = XContentHelper.toXContent(expectedBody, REQUEST_BODY_CONTENT_TYPE, false);
assertEquals(XContentType.JSON.mediaTypeWithoutParameters(), actualEntity.getContentType().getValue());
Expand Down
Loading