Skip to content

Commit

Permalink
Add API to execute SLM policy on demand (#41038)
Browse files Browse the repository at this point in the history
This commit adds the ability to perform a snapshot on demand for a policy. This
can be useful to take a snapshot immediately prior to performing some sort of
maintenance.

```json
PUT /_ilm/snapshot/<policy>/_execute
```

And it returns the response with the generated snapshot name:

```json
{
  "snapshot_name" : "production-snap-2019.04.09-rfyv3j9qreixkdbnfuw0ug"
}
```

Note that this does not allow waiting for the snapshot, and the snapshot could
still fail. It *does* record this information into the cluster state similar to
a regularly trigged SLM job.

Relates to #38461
  • Loading branch information
dakrone authored Apr 12, 2019
1 parent e564b48 commit 1b4574a
Show file tree
Hide file tree
Showing 8 changed files with 366 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@
import org.elasticsearch.xpack.core.security.authz.privilege.ConditionalClusterPrivilege;
import org.elasticsearch.xpack.core.security.authz.privilege.ConditionalClusterPrivileges;
import org.elasticsearch.xpack.core.security.transport.netty4.SecurityNetty4Transport;
import org.elasticsearch.xpack.core.snapshotlifecycle.action.ExecuteSnapshotLifecycleAction;
import org.elasticsearch.xpack.core.sql.SqlFeatureSetUsage;
import org.elasticsearch.xpack.core.ssl.SSLService;
import org.elasticsearch.xpack.core.ssl.action.GetCertificateInfoAction;
Expand Down Expand Up @@ -370,6 +371,7 @@ public List<Action<? extends ActionResponse>> getClientActions() {
PutSnapshotLifecycleAction.INSTANCE,
GetSnapshotLifecycleAction.INSTANCE,
DeleteSnapshotLifecycleAction.INSTANCE,
ExecuteSnapshotLifecycleAction.INSTANCE,
// Freeze
TransportFreezeIndexAction.FreezeIndexAction.INSTANCE
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.snapshotlifecycle.action;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

/**
* Action used to manually invoke a create snapshot request for a given
* snapshot lifecycle policy regardless of schedule.
*/
public class ExecuteSnapshotLifecycleAction extends Action<ExecuteSnapshotLifecycleAction.Response> {
public static final ExecuteSnapshotLifecycleAction INSTANCE = new ExecuteSnapshotLifecycleAction();
public static final String NAME = "cluster:admin/ilm/snapshot/execute";

protected ExecuteSnapshotLifecycleAction() {
super(NAME);
}

@Override
public ExecuteSnapshotLifecycleAction.Response newResponse() {
throw new UnsupportedOperationException();
}

@Override
public Writeable.Reader<ExecuteSnapshotLifecycleAction.Response> getResponseReader() {
return Response::new;
}

public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {

private String lifecycleId;

public Request(String lifecycleId) {
this.lifecycleId = lifecycleId;
}

public Request() { }

public String getLifecycleId() {
return this.lifecycleId;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
lifecycleId = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(lifecycleId);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return Objects.hash(lifecycleId);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != getClass()) {
return false;
}
Request other = (Request) obj;
return lifecycleId.equals(other.lifecycleId);
}

@Override
public String toString() {
return Strings.toString(this);
}
}

public static class Response extends ActionResponse implements ToXContentObject {

private final String snapshotName;

public Response(String snapshotName) {
this.snapshotName = snapshotName;
}

public String getSnapshotName() {
return this.snapshotName;
}

public Response(StreamInput in) throws IOException {
this(in.readString());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(this.snapshotName);
}

@Override
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("snapshot_name", getSnapshotName());
builder.endObject();
return builder;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.rest.ESRestTestCase;
Expand Down Expand Up @@ -152,6 +155,57 @@ public void testPolicyFailure() throws Exception {
assertOK(client().performRequest(delReq));
}

public void testPolicyManualExecution() throws Exception {
final String indexName = "test";
final String policyName = "test-policy";
final String repoId = "my-repo";
int docCount = randomIntBetween(10, 50);
List<IndexRequestBuilder> indexReqs = new ArrayList<>();
for (int i = 0; i < docCount; i++) {
index(client(), indexName, "" + i, "foo", "bar");
}

// Create a snapshot repo
inializeRepo(repoId);

createSnapshotPolicy(policyName, "snap", "1 2 3 4 5 ?", repoId, indexName, true);

ResponseException badResp = expectThrows(ResponseException.class,
() -> client().performRequest(new Request("PUT", "/_ilm/snapshot/" + policyName + "-bad/_execute")));
assertThat(EntityUtils.toString(badResp.getResponse().getEntity()),
containsString("no such snapshot lifecycle policy [" + policyName + "-bad]"));

Response goodResp = client().performRequest(new Request("PUT", "/_ilm/snapshot/" + policyName + "/_execute"));

try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, EntityUtils.toByteArray(goodResp.getEntity()))) {
final String snapshotName = parser.mapStrings().get("snapshot_name");

// Check that the executed snapshot is created
assertBusy(() -> {
try {
Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName));
Map<String, Object> snapshotResponseMap;
try (InputStream is = response.getEntity().getContent()) {
snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
}
assertThat(snapshotResponseMap.size(), greaterThan(0));
} catch (ResponseException e) {
fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity()));
}
});
}

Request delReq = new Request("DELETE", "/_ilm/snapshot/" + policyName);
assertOK(client().performRequest(delReq));

// It's possible there could have been a snapshot in progress when the
// policy is deleted, so wait for it to be finished
assertBusy(() -> {
assertThat(wipeSnapshots().size(), equalTo(0));
});
}

private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String repoId,
String indexPattern, boolean ignoreUnavailable) throws IOException {
Map<String, Object> snapConfig = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction;
import org.elasticsearch.xpack.core.indexlifecycle.action.StartILMAction;
import org.elasticsearch.xpack.core.indexlifecycle.action.StopILMAction;
import org.elasticsearch.xpack.core.snapshotlifecycle.action.ExecuteSnapshotLifecycleAction;
import org.elasticsearch.xpack.indexlifecycle.action.RestDeleteLifecycleAction;
import org.elasticsearch.xpack.indexlifecycle.action.RestExplainLifecycleAction;
import org.elasticsearch.xpack.indexlifecycle.action.RestGetLifecycleAction;
Expand Down Expand Up @@ -87,9 +88,11 @@
import org.elasticsearch.xpack.core.snapshotlifecycle.action.GetSnapshotLifecycleAction;
import org.elasticsearch.xpack.core.snapshotlifecycle.action.PutSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.RestDeleteSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.RestExecuteSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.RestGetSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.RestPutSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.TransportDeleteSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.TransportExecuteSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.TransportGetSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.TransportPutSnapshotLifecycleAction;

Expand Down Expand Up @@ -208,7 +211,8 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
// Snapshot lifecycle actions
new RestPutSnapshotLifecycleAction(settings, restController),
new RestDeleteSnapshotLifecycleAction(settings, restController),
new RestGetSnapshotLifecycleAction(settings, restController)
new RestGetSnapshotLifecycleAction(settings, restController),
new RestExecuteSnapshotLifecycleAction(settings, restController)
);
}

Expand All @@ -231,7 +235,8 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
// Snapshot lifecycle actions
new ActionHandler<>(PutSnapshotLifecycleAction.INSTANCE, TransportPutSnapshotLifecycleAction.class),
new ActionHandler<>(DeleteSnapshotLifecycleAction.INSTANCE, TransportDeleteSnapshotLifecycleAction.class),
new ActionHandler<>(GetSnapshotLifecycleAction.INSTANCE, TransportGetSnapshotLifecycleAction.class));
new ActionHandler<>(GetSnapshotLifecycleAction.INSTANCE, TransportGetSnapshotLifecycleAction.class),
new ActionHandler<>(ExecuteSnapshotLifecycleAction.INSTANCE, TransportExecuteSnapshotLifecycleAction.class));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void maybeScheduleSnapshot(final SnapshotLifecyclePolicyMetadata snapshot
/**
* Generate the job id for a given policy metadata. The job id is {@code <policyid>-<version>}
*/
static String getJobId(SnapshotLifecyclePolicyMetadata policyMeta) {
public static String getJobId(SnapshotLifecyclePolicyMetadata policyMeta) {
return policyMeta.getPolicy().getId() + "-" + policyMeta.getVersion();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,37 @@ public SnapshotLifecycleTask(final Client client, final ClusterService clusterSe
@Override
public void triggered(SchedulerEngine.Event event) {
logger.debug("snapshot lifecycle policy task triggered from job [{}]", event.getJobName());
Optional<SnapshotLifecyclePolicyMetadata> maybeMetadata = getSnapPolicyMetadata(event.getJobName(), clusterService.state());
// If we were on JDK 9 and could use ifPresentOrElse this would be simpler.
boolean successful = maybeMetadata.map(policyMetadata -> {

final Optional<String> snapshotName = maybeTakeSnapshot(event.getJobName(), client, clusterService);

// Would be cleaner if we could use Optional#ifPresentOrElse
snapshotName.ifPresent(name ->
logger.info("snapshot lifecycle policy job [{}] issued new snapshot creation for [{}] successfully",
event.getJobName(), name));

if (snapshotName.isPresent() == false) {
logger.warn("snapshot lifecycle policy for job [{}] no longer exists, snapshot not created", event.getJobName());
}
}

/**
* For the given job id (a combination of policy id and version), issue a create snapshot
* request. On a successful or failed create snapshot issuing the state is stored in the cluster
* state in the policy's metadata
* @return An optional snapshot name if the request was issued successfully
*/
public static Optional<String> maybeTakeSnapshot(final String jobId, final Client client, final ClusterService clusterService) {
Optional<SnapshotLifecyclePolicyMetadata> maybeMetadata = getSnapPolicyMetadata(jobId, clusterService.state());
String snapshotName = maybeMetadata.map(policyMetadata -> {
CreateSnapshotRequest request = policyMetadata.getPolicy().toRequest();
final LifecyclePolicySecurityClient clientWithHeaders = new LifecyclePolicySecurityClient(this.client,
final LifecyclePolicySecurityClient clientWithHeaders = new LifecyclePolicySecurityClient(client,
ClientHelper.INDEX_LIFECYCLE_ORIGIN, policyMetadata.getHeaders());
logger.info("triggering periodic snapshot for policy [{}]", policyMetadata.getPolicy().getId());
clientWithHeaders.admin().cluster().createSnapshot(request, new ActionListener<CreateSnapshotResponse>() {
logger.info("snapshot lifecycle policy [{}] issuing create snapshot [{}]",
policyMetadata.getPolicy().getId(), request.snapshot());
clientWithHeaders.admin().cluster().createSnapshot(request, new ActionListener<>() {
@Override
public void onResponse(CreateSnapshotResponse createSnapshotResponse) {
logger.info("snapshot response for [{}]: {}",
logger.debug("snapshot response for [{}]: {}",
policyMetadata.getPolicy().getId(), Strings.toString(createSnapshotResponse));
clusterService.submitStateUpdateTask("slm-record-success-" + policyMetadata.getPolicy().getId(),
WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), Instant.now().toEpochMilli()));
Expand All @@ -77,12 +97,10 @@ public void onFailure(Exception e) {
WriteJobStatus.failure(policyMetadata.getPolicy().getId(), request.snapshot(), Instant.now().toEpochMilli(), e));
}
});
return true;
}).orElse(false);
return request.snapshot();
}).orElse(null);

if (successful == false) {
logger.warn("snapshot lifecycle policy for job [{}] no longer exists, snapshot not created", event.getJobName());
}
return Optional.ofNullable(snapshotName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.snapshotlifecycle.action;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.snapshotlifecycle.action.ExecuteSnapshotLifecycleAction;

import java.io.IOException;

public class RestExecuteSnapshotLifecycleAction extends BaseRestHandler {

public RestExecuteSnapshotLifecycleAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.PUT, "/_ilm/snapshot/{name}/_execute", this);
}

@Override
public String getName() {
return "ilm_execute_snapshot_lifecycle";
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String snapLifecycleId = request.param("name");
ExecuteSnapshotLifecycleAction.Request req = new ExecuteSnapshotLifecycleAction.Request(snapLifecycleId);
req.timeout(request.paramAsTime("timeout", req.timeout()));
req.masterNodeTimeout(request.paramAsTime("master_timeout", req.masterNodeTimeout()));
return channel -> client.execute(ExecuteSnapshotLifecycleAction.INSTANCE, req, new RestToXContentListener<>(channel));
}
}
Loading

0 comments on commit 1b4574a

Please sign in to comment.