Skip to content

Commit

Permalink
Implement SnapshotRetentionTask's snapshot filtering and deletion
Browse files Browse the repository at this point in the history
This commit implements the snapshot filtering and deletion for
`SnapshotRetentionTask`. Currently only the expire-after age is used for
determining whether a snapshot is eligible for deletion.

Relates to elastic#43663
  • Loading branch information
dakrone committed Jul 23, 2019
1 parent f520f42 commit 262594b
Show file tree
Hide file tree
Showing 5 changed files with 437 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
public class SnapshotLifecyclePolicy extends AbstractDiffable<SnapshotLifecyclePolicy>
implements Writeable, Diffable<SnapshotLifecyclePolicy>, ToXContentObject {

public static final String POLICY_ID_METADATA_FIELD = "policy";

private final String id;
private final String name;
private final String schedule;
Expand All @@ -61,7 +63,6 @@ public class SnapshotLifecyclePolicy extends AbstractDiffable<SnapshotLifecycleP
private static final ParseField RETENTION = new ParseField("retention");
private static final IndexNameExpressionResolver.DateMathExpressionResolver DATE_MATH_RESOLVER =
new IndexNameExpressionResolver.DateMathExpressionResolver();
private static final String POLICY_ID_METADATA_FIELD = "policy";
private static final String METADATA_FIELD_NAME = "metadata";

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.snapshots.SnapshotInfo;

import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;

public class SnapshotRetentionConfiguration implements ToXContentObject, Writeable {

Expand Down Expand Up @@ -56,6 +59,26 @@ public TimeValue getExpireAfter() {
return this.expireAfter;
}

/**
* Return a predicate by which a SnapshotInfo can be tested to see
* whether it should be deleted according to this retention policy.
* @param allSnapshots a list of all snapshot pertaining to this SLM policy and repository
*/
public Predicate<SnapshotInfo> getSnapshotDeletionPredicate(final List<SnapshotInfo> allSnapshots) {
return si -> {
if (this.expireAfter != null) {
TimeValue snapshotAge = new TimeValue(System.currentTimeMillis() - si.startTime());
if (snapshotAge.compareTo(this.expireAfter) > 0) {
return true;
} else {
return false;
}
}
// If nothing matched, the snapshot is not eligible for deletion
return false;
};
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
package org.elasticsearch.xpack.slm;

import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
Expand All @@ -22,6 +25,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy;
import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotRetentionConfiguration;

Expand Down Expand Up @@ -222,6 +226,81 @@ public void testPolicyManualExecution() throws Exception {
});
}

public void testBasicTimeBasedRetenion() throws Exception {
final String indexName = "test";
final String policyName = "test-policy";
final String repoId = "my-repo";
int docCount = randomIntBetween(10, 50);
List<IndexRequestBuilder> indexReqs = new ArrayList<>();
for (int i = 0; i < docCount; i++) {
index(client(), indexName, "" + i, "foo", "bar");
}

// Create a snapshot repo
inializeRepo(repoId);

// Create a policy with a retention period of 1 millisecond
createSnapshotPolicy(policyName, "snap", "1 2 3 4 5 ?", repoId, indexName, true,
new SnapshotRetentionConfiguration(TimeValue.timeValueMillis(1)));

// Manually create a snapshot
Response executeResp = client().performRequest(new Request("PUT", "/_slm/policy/" + policyName + "/_execute"));

final String snapshotName;
try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, EntityUtils.toByteArray(executeResp.getEntity()))) {
snapshotName = parser.mapStrings().get("snapshot_name");

// Check that the executed snapshot is created
assertBusy(() -> {
try {
Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName));
Map<String, Object> snapshotResponseMap;
try (InputStream is = response.getEntity().getContent()) {
snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
}
assertThat(snapshotResponseMap.size(), greaterThan(0));
final Map<String, Object> metadata = extractMetadata(snapshotResponseMap, snapshotName);
assertNotNull(metadata);
assertThat(metadata.get("policy"), equalTo(policyName));
assertHistoryIsPresent(policyName, true, repoId);
} catch (ResponseException e) {
fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity()));
}
});
}

// Run retention every second
ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest();
req.transientSettings(Settings.builder().put(LifecycleSettings.SLM_RETENTION_SCHEDULE, "*/1 * * * * ?"));
try (XContentBuilder builder = jsonBuilder()) {
req.toXContent(builder, ToXContent.EMPTY_PARAMS);
Request r = new Request("PUT", "/_cluster/settings");
r.setJsonEntity(Strings.toString(builder));
Response updateSettingsResp = client().performRequest(r);
}

// Check that the snapshot created by the policy has been removed by retention
assertBusy(() -> {
// We expect a failed response because the snapshot should not exist
try {
Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName));
assertThat(EntityUtils.toString(response.getEntity()), containsString("snapshot_missing_exception"));
} catch (ResponseException e) {
assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception"));
}
});

Request delReq = new Request("DELETE", "/_slm/policy/" + policyName);
assertOK(client().performRequest(delReq));

// It's possible there could have been a snapshot in progress when the
// policy is deleted, so wait for it to be finished
assertBusy(() -> {
assertThat(wipeSnapshots().size(), equalTo(0));
});
}

@SuppressWarnings("unchecked")
private static Map<String, Object> extractMetadata(Map<String, Object> snapshotResponseMap, String snapshotPrefix) {
List<Map<String, Object>> snapResponse = ((List<Map<String, Object>>) snapshotResponseMap.get("responses")).stream()
Expand Down Expand Up @@ -284,6 +363,13 @@ private void assertHistoryIsPresent(String policyName, boolean success, String r

private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String repoId,
String indexPattern, boolean ignoreUnavailable) throws IOException {
createSnapshotPolicy(policyName, snapshotNamePattern, schedule, repoId, indexPattern,
ignoreUnavailable, SnapshotRetentionConfiguration.EMPTY);
}

private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String repoId,
String indexPattern, boolean ignoreUnavailable,
SnapshotRetentionConfiguration retention) throws IOException {
Map<String, Object> snapConfig = new HashMap<>();
snapConfig.put("indices", Collections.singletonList(indexPattern));
snapConfig.put("ignore_unavailable", ignoreUnavailable);
Expand All @@ -295,8 +381,8 @@ private void createSnapshotPolicy(String policyName, String snapshotNamePattern,
() -> randomAlphaOfLength(5)), randomAlphaOfLength(4));
}
}
SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyName, snapshotNamePattern, schedule, repoId, snapConfig,
SnapshotRetentionConfiguration.EMPTY);
SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyName, snapshotNamePattern, schedule,
repoId, snapConfig, retention);

Request putLifecycle = new Request("PUT", "/_slm/policy/" + policyName);
XContentBuilder lifecycleBuilder = JsonXContent.contentBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,31 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecycleMetadata;
import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy;
import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotRetentionConfiguration;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* The {@code SnapshotRetentionTask} is invoked by the scheduled job from the
Expand Down Expand Up @@ -60,40 +71,121 @@ public void triggered(SchedulerEngine.Event event) {
.map(SnapshotLifecyclePolicy::getRepository)
.collect(Collectors.toSet());

// Find all the snapshots that are past their retention date
// TODO: include min/max snapshot count as a criteria for deletion also
final List<SnapshotInfo> snapshotsToBeDeleted = getAllSnapshots(repositioriesToFetch).stream()
.filter(snapshot -> snapshotEligibleForDeletion(snapshot, policiesWithRetention))
.collect(Collectors.toList());
getAllSnapshots(repositioriesToFetch, new ActionListener<>() {
@Override
public void onResponse(List<Tuple<String, SnapshotInfo>> allSnapshots) {
// Find all the snapshots that are past their retention date
final List<Tuple<String, SnapshotInfo>> snapshotsToBeDeleted = allSnapshots.stream()
.filter(snapshot -> snapshotEligibleForDeletion(snapshot.v2(), allSnapshots, policiesWithRetention))
.collect(Collectors.toList());

// Finally, delete the snapshots that need to be deleted
deleteSnapshots(snapshotsToBeDeleted);
// Finally, delete the snapshots that need to be deleted
deleteSnapshots(snapshotsToBeDeleted);
}

@Override
public void onFailure(Exception e) {
running.set(false);
}
}, err -> running.set(false));

} finally {
running.set(false);
}
} else {
logger.debug("snapshot lifecycle retention task started, but a task is already running, skipping");
logger.trace("snapshot lifecycle retention task started, but a task is already running, skipping");
}
}

static Map<String, SnapshotLifecyclePolicy> getAllPoliciesWithRetentionEnabled(final ClusterState state) {
// TODO: fill me in
return Collections.emptyMap();
final SnapshotLifecycleMetadata snapMeta = state.metaData().custom(SnapshotLifecycleMetadata.TYPE);
if (snapMeta == null) {
return Collections.emptyMap();
}
return snapMeta.getSnapshotConfigurations().entrySet().stream()
.filter(e -> e.getValue().getPolicy().getRetentionPolicy() != null)
.filter(e -> e.getValue().getPolicy().getRetentionPolicy().equals(SnapshotRetentionConfiguration.EMPTY) == false)
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getPolicy()));
}

static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, Map<String, SnapshotLifecyclePolicy> policies) {
// TODO: fill me in
return false;
static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, List<Tuple<String, SnapshotInfo>> allSnapshots,
Map<String, SnapshotLifecyclePolicy> policies) {
if (snapshot.userMetadata() == null) {
// This snapshot has no metadata, it is not eligible for deletion
return false;
}

final String policyId;
try {
policyId = (String) snapshot.userMetadata().get(SnapshotLifecyclePolicy.POLICY_ID_METADATA_FIELD);
} catch (Exception e) {
logger.error("unable to retrieve policy id from snapshot metadata [" + snapshot.userMetadata() + "]", e);
throw e;
}

SnapshotLifecyclePolicy policy = policies.get(policyId);
if (policy == null) {
// This snapshot was taking by a policy that doesn't exist, so it's not eligible
return false;
}

SnapshotRetentionConfiguration retention = policy.getRetentionPolicy();
if (retention == null || retention.equals(SnapshotRetentionConfiguration.EMPTY)) {
// Retention is not configured
return false;
}

final String repository = policy.getRepository();
// Retrieve the predicate based on the retention policy, passing in snapshots pertaining only to *this* policy and repository
boolean eligible = retention.getSnapshotDeletionPredicate(
allSnapshots.stream()
.filter(t -> t.v1().equals(repository))
.filter(t -> Optional.ofNullable(t.v2().userMetadata())
.map(meta -> meta.get(SnapshotLifecyclePolicy.POLICY_ID_METADATA_FIELD))
.map(pId -> pId.equals(policyId))
.orElse(false))
.map(Tuple::v2).collect(Collectors.toList()))
.test(snapshot);
logger.debug("[{}] testing snapshot [{}] deletion eligibility: {}",
repository, snapshot.snapshotId(), eligible ? "ELIGIBLE" : "INELIGIBLE");
return eligible;
}

List<SnapshotInfo> getAllSnapshots(Collection<String> repositories) {
// TODO: fill me in
return Collections.emptyList();
void getAllSnapshots(Collection<String> repositories, ActionListener<List<Tuple<String, SnapshotInfo>>> listener,
Consumer<Exception> errorHandler) {
client.admin().cluster().prepareGetSnapshots(repositories.toArray(Strings.EMPTY_ARRAY))
.setIgnoreUnavailable(true)
.execute(new ActionListener<GetSnapshotsResponse>() {
@Override
public void onResponse(final GetSnapshotsResponse resp) {
listener.onResponse(repositories.stream()
.flatMap(repo -> {
try {
return resp.getSnapshots(repo).stream()
.map(si -> new Tuple<>(repo, si));
} catch (Exception e) {
logger.debug(new ParameterizedMessage("unable to retrieve snapshots for [{}] repository", repo), e);
return Stream.empty();
}
})
.collect(Collectors.toList()));
}

@Override
public void onFailure(Exception e) {
logger.debug(new ParameterizedMessage("unable to retrieve snapshots for [{}] repositories", repositories), e);
errorHandler.accept(e);
}
});
}

void deleteSnapshots(List<SnapshotInfo> snapshotsToDelete) {
// TODO: fill me in
logger.info("deleting {}", snapshotsToDelete);
void deleteSnapshots(List<Tuple<String, SnapshotInfo>> snapshotsToDelete) {
// TODO: make this more resilient and possibly only delete for a certain amount of time
logger.info("starting snapshot retention deletion for [{}] snapshots", snapshotsToDelete.size());
CountDownLatch latch = new CountDownLatch(snapshotsToDelete.size());
snapshotsToDelete.forEach(snap -> {
logger.info("[{}] snapshot retention deleting snapshot [{}]", snap.v1(), snap.v2().snapshotId());
client.admin().cluster().prepareDeleteSnapshot(snap.v1(), snap.v2().snapshotId().getName()).get();
});
}
}
Loading

0 comments on commit 262594b

Please sign in to comment.