From 262594b74a5ae5e66873a9fc9a905372e3c6d024 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 17 Jul 2019 16:45:40 -0600 Subject: [PATCH] Implement SnapshotRetentionTask's snapshot filtering and deletion This commit implements the snapshot filtering and deletion for `SnapshotRetentionTask`. Currently only the expire-after age is used for determining whether a snapshot is eligible for deletion. Relates to #43663 --- .../SnapshotLifecyclePolicy.java | 3 +- .../SnapshotRetentionConfiguration.java | 23 ++ .../xpack/slm/SnapshotLifecycleIT.java | 90 +++++++- .../xpack/slm/SnapshotRetentionTask.java | 130 +++++++++-- .../xpack/slm/SnapshotRetentionTaskTests.java | 213 ++++++++++++++++++ 5 files changed, 437 insertions(+), 22 deletions(-) create mode 100644 x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/SnapshotLifecyclePolicy.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/SnapshotLifecyclePolicy.java index a187ad7e9e076..755d389734bfe 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/SnapshotLifecyclePolicy.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/SnapshotLifecyclePolicy.java @@ -47,6 +47,8 @@ public class SnapshotLifecyclePolicy extends AbstractDiffable implements Writeable, Diffable, ToXContentObject { + public static final String POLICY_ID_METADATA_FIELD = "policy"; + private final String id; private final String name; private final String schedule; @@ -61,7 +63,6 @@ public class SnapshotLifecyclePolicy extends AbstractDiffable getSnapshotDeletionPredicate(final List allSnapshots) { + return si -> { + if (this.expireAfter != null) { + TimeValue snapshotAge = new TimeValue(System.currentTimeMillis() - si.startTime()); + if (snapshotAge.compareTo(this.expireAfter) > 0) { + return true; + } else { + return false; + } + } + // If nothing matched, the snapshot is not eligible for deletion + return false; + }; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java index 897358378debc..4c87e2084678e 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java @@ -7,12 +7,15 @@ package org.elasticsearch.xpack.slm; import org.apache.http.util.EntityUtils; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContent; @@ -22,6 +25,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings; import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy; import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotRetentionConfiguration; @@ -222,6 +226,81 @@ public void testPolicyManualExecution() throws Exception { }); } + public void testBasicTimeBasedRetenion() throws Exception { + final String indexName = "test"; + final String policyName = "test-policy"; + final String repoId = "my-repo"; + int docCount = randomIntBetween(10, 50); + List indexReqs = new ArrayList<>(); + for (int i = 0; i < docCount; i++) { + index(client(), indexName, "" + i, "foo", "bar"); + } + + // Create a snapshot repo + inializeRepo(repoId); + + // Create a policy with a retention period of 1 millisecond + createSnapshotPolicy(policyName, "snap", "1 2 3 4 5 ?", repoId, indexName, true, + new SnapshotRetentionConfiguration(TimeValue.timeValueMillis(1))); + + // Manually create a snapshot + Response executeResp = client().performRequest(new Request("PUT", "/_slm/policy/" + policyName + "/_execute")); + + final String snapshotName; + try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, EntityUtils.toByteArray(executeResp.getEntity()))) { + snapshotName = parser.mapStrings().get("snapshot_name"); + + // Check that the executed snapshot is created + assertBusy(() -> { + try { + Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName)); + Map snapshotResponseMap; + try (InputStream is = response.getEntity().getContent()) { + snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); + } + assertThat(snapshotResponseMap.size(), greaterThan(0)); + final Map metadata = extractMetadata(snapshotResponseMap, snapshotName); + assertNotNull(metadata); + assertThat(metadata.get("policy"), equalTo(policyName)); + assertHistoryIsPresent(policyName, true, repoId); + } catch (ResponseException e) { + fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); + } + }); + } + + // Run retention every second + ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest(); + req.transientSettings(Settings.builder().put(LifecycleSettings.SLM_RETENTION_SCHEDULE, "*/1 * * * * ?")); + try (XContentBuilder builder = jsonBuilder()) { + req.toXContent(builder, ToXContent.EMPTY_PARAMS); + Request r = new Request("PUT", "/_cluster/settings"); + r.setJsonEntity(Strings.toString(builder)); + Response updateSettingsResp = client().performRequest(r); + } + + // Check that the snapshot created by the policy has been removed by retention + assertBusy(() -> { + // We expect a failed response because the snapshot should not exist + try { + Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName)); + assertThat(EntityUtils.toString(response.getEntity()), containsString("snapshot_missing_exception")); + } catch (ResponseException e) { + assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception")); + } + }); + + Request delReq = new Request("DELETE", "/_slm/policy/" + policyName); + assertOK(client().performRequest(delReq)); + + // It's possible there could have been a snapshot in progress when the + // policy is deleted, so wait for it to be finished + assertBusy(() -> { + assertThat(wipeSnapshots().size(), equalTo(0)); + }); + } + @SuppressWarnings("unchecked") private static Map extractMetadata(Map snapshotResponseMap, String snapshotPrefix) { List> snapResponse = ((List>) snapshotResponseMap.get("responses")).stream() @@ -284,6 +363,13 @@ private void assertHistoryIsPresent(String policyName, boolean success, String r private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String repoId, String indexPattern, boolean ignoreUnavailable) throws IOException { + createSnapshotPolicy(policyName, snapshotNamePattern, schedule, repoId, indexPattern, + ignoreUnavailable, SnapshotRetentionConfiguration.EMPTY); + } + + private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String repoId, + String indexPattern, boolean ignoreUnavailable, + SnapshotRetentionConfiguration retention) throws IOException { Map snapConfig = new HashMap<>(); snapConfig.put("indices", Collections.singletonList(indexPattern)); snapConfig.put("ignore_unavailable", ignoreUnavailable); @@ -295,8 +381,8 @@ private void createSnapshotPolicy(String policyName, String snapshotNamePattern, () -> randomAlphaOfLength(5)), randomAlphaOfLength(4)); } } - SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyName, snapshotNamePattern, schedule, repoId, snapConfig, - SnapshotRetentionConfiguration.EMPTY); + SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyName, snapshotNamePattern, schedule, + repoId, snapConfig, retention); Request putLifecycle = new Request("PUT", "/_slm/policy/" + policyName); XContentBuilder lifecycleBuilder = JsonXContent.contentBuilder(); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index bee258964c5eb..8a71a912ca72f 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -8,20 +8,31 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecycleMetadata; import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy; +import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotRetentionConfiguration; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * The {@code SnapshotRetentionTask} is invoked by the scheduled job from the @@ -60,40 +71,121 @@ public void triggered(SchedulerEngine.Event event) { .map(SnapshotLifecyclePolicy::getRepository) .collect(Collectors.toSet()); - // Find all the snapshots that are past their retention date - // TODO: include min/max snapshot count as a criteria for deletion also - final List snapshotsToBeDeleted = getAllSnapshots(repositioriesToFetch).stream() - .filter(snapshot -> snapshotEligibleForDeletion(snapshot, policiesWithRetention)) - .collect(Collectors.toList()); + getAllSnapshots(repositioriesToFetch, new ActionListener<>() { + @Override + public void onResponse(List> allSnapshots) { + // Find all the snapshots that are past their retention date + final List> snapshotsToBeDeleted = allSnapshots.stream() + .filter(snapshot -> snapshotEligibleForDeletion(snapshot.v2(), allSnapshots, policiesWithRetention)) + .collect(Collectors.toList()); - // Finally, delete the snapshots that need to be deleted - deleteSnapshots(snapshotsToBeDeleted); + // Finally, delete the snapshots that need to be deleted + deleteSnapshots(snapshotsToBeDeleted); + } + + @Override + public void onFailure(Exception e) { + running.set(false); + } + }, err -> running.set(false)); } finally { running.set(false); } } else { - logger.debug("snapshot lifecycle retention task started, but a task is already running, skipping"); + logger.trace("snapshot lifecycle retention task started, but a task is already running, skipping"); } } static Map getAllPoliciesWithRetentionEnabled(final ClusterState state) { - // TODO: fill me in - return Collections.emptyMap(); + final SnapshotLifecycleMetadata snapMeta = state.metaData().custom(SnapshotLifecycleMetadata.TYPE); + if (snapMeta == null) { + return Collections.emptyMap(); + } + return snapMeta.getSnapshotConfigurations().entrySet().stream() + .filter(e -> e.getValue().getPolicy().getRetentionPolicy() != null) + .filter(e -> e.getValue().getPolicy().getRetentionPolicy().equals(SnapshotRetentionConfiguration.EMPTY) == false) + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getPolicy())); } - static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, Map policies) { - // TODO: fill me in - return false; + static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, List> allSnapshots, + Map policies) { + if (snapshot.userMetadata() == null) { + // This snapshot has no metadata, it is not eligible for deletion + return false; + } + + final String policyId; + try { + policyId = (String) snapshot.userMetadata().get(SnapshotLifecyclePolicy.POLICY_ID_METADATA_FIELD); + } catch (Exception e) { + logger.error("unable to retrieve policy id from snapshot metadata [" + snapshot.userMetadata() + "]", e); + throw e; + } + + SnapshotLifecyclePolicy policy = policies.get(policyId); + if (policy == null) { + // This snapshot was taking by a policy that doesn't exist, so it's not eligible + return false; + } + + SnapshotRetentionConfiguration retention = policy.getRetentionPolicy(); + if (retention == null || retention.equals(SnapshotRetentionConfiguration.EMPTY)) { + // Retention is not configured + return false; + } + + final String repository = policy.getRepository(); + // Retrieve the predicate based on the retention policy, passing in snapshots pertaining only to *this* policy and repository + boolean eligible = retention.getSnapshotDeletionPredicate( + allSnapshots.stream() + .filter(t -> t.v1().equals(repository)) + .filter(t -> Optional.ofNullable(t.v2().userMetadata()) + .map(meta -> meta.get(SnapshotLifecyclePolicy.POLICY_ID_METADATA_FIELD)) + .map(pId -> pId.equals(policyId)) + .orElse(false)) + .map(Tuple::v2).collect(Collectors.toList())) + .test(snapshot); + logger.debug("[{}] testing snapshot [{}] deletion eligibility: {}", + repository, snapshot.snapshotId(), eligible ? "ELIGIBLE" : "INELIGIBLE"); + return eligible; } - List getAllSnapshots(Collection repositories) { - // TODO: fill me in - return Collections.emptyList(); + void getAllSnapshots(Collection repositories, ActionListener>> listener, + Consumer errorHandler) { + client.admin().cluster().prepareGetSnapshots(repositories.toArray(Strings.EMPTY_ARRAY)) + .setIgnoreUnavailable(true) + .execute(new ActionListener() { + @Override + public void onResponse(final GetSnapshotsResponse resp) { + listener.onResponse(repositories.stream() + .flatMap(repo -> { + try { + return resp.getSnapshots(repo).stream() + .map(si -> new Tuple<>(repo, si)); + } catch (Exception e) { + logger.debug(new ParameterizedMessage("unable to retrieve snapshots for [{}] repository", repo), e); + return Stream.empty(); + } + }) + .collect(Collectors.toList())); + } + + @Override + public void onFailure(Exception e) { + logger.debug(new ParameterizedMessage("unable to retrieve snapshots for [{}] repositories", repositories), e); + errorHandler.accept(e); + } + }); } - void deleteSnapshots(List snapshotsToDelete) { - // TODO: fill me in - logger.info("deleting {}", snapshotsToDelete); + void deleteSnapshots(List> snapshotsToDelete) { + // TODO: make this more resilient and possibly only delete for a certain amount of time + logger.info("starting snapshot retention deletion for [{}] snapshots", snapshotsToDelete.size()); + CountDownLatch latch = new CountDownLatch(snapshotsToDelete.size()); + snapshotsToDelete.forEach(snap -> { + logger.info("[{}] snapshot retention deleting snapshot [{}]", snap.v1(), snap.v2().snapshotId()); + client.admin().cluster().prepareDeleteSnapshot(snap.v1(), snap.v2().snapshotId().getName()).get(); + }); } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java new file mode 100644 index 0000000000000..6f3aabdc11781 --- /dev/null +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -0,0 +1,213 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.slm; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.indexlifecycle.OperationMode; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecycleMetadata; +import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy; +import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicyMetadata; +import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotRetentionConfiguration; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.equalTo; + +public class SnapshotRetentionTaskTests extends ESTestCase { + + public void testGetAllPoliciesWithRetentionEnabled() { + SnapshotLifecyclePolicy policyWithout = new SnapshotLifecyclePolicy("policyWithout", "snap", "1 * * * * ?", + "repo", null, SnapshotRetentionConfiguration.EMPTY); + SnapshotLifecyclePolicy policyWithout2 = new SnapshotLifecyclePolicy("policyWithout2", "snap", "1 * * * * ?", + "repo", null, new SnapshotRetentionConfiguration(null)); + SnapshotLifecyclePolicy policyWith = new SnapshotLifecyclePolicy("policyWith", "snap", "1 * * * * ?", + "repo", null, new SnapshotRetentionConfiguration(TimeValue.timeValueDays(30))); + + // Test with no SLM metadata + ClusterState state = ClusterState.builder(new ClusterName("cluster")).build(); + assertThat(SnapshotRetentionTask.getAllPoliciesWithRetentionEnabled(state), equalTo(Collections.emptyMap())); + + // Test with empty SLM metadata + MetaData metaData = MetaData.builder() + .putCustom(SnapshotLifecycleMetadata.TYPE, new SnapshotLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING)) + .build(); + state = ClusterState.builder(new ClusterName("cluster")).metaData(metaData).build(); + assertThat(SnapshotRetentionTask.getAllPoliciesWithRetentionEnabled(state), equalTo(Collections.emptyMap())); + + // Test with metadata containing only a policy without retention + state = createState(policyWithout); + assertThat(SnapshotRetentionTask.getAllPoliciesWithRetentionEnabled(state), equalTo(Collections.emptyMap())); + + // Test with metadata containing a couple of policies + state = createState(policyWithout, policyWithout2, policyWith); + Map policyMap = SnapshotRetentionTask.getAllPoliciesWithRetentionEnabled(state); + assertThat(policyMap.size(), equalTo(1)); + assertThat(policyMap.get("policyWith"), equalTo(policyWith)); + } + + public void testSnapshotEligibleForDeletion() { + SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy("policy", "snap", "1 * * * * ?", + "repo", null, new SnapshotRetentionConfiguration(TimeValue.timeValueDays(30))); + SnapshotLifecyclePolicy policyWithNoRetention = new SnapshotLifecyclePolicy("policy", "snap", "1 * * * * ?", + "repo", null, randomBoolean() ? null : SnapshotRetentionConfiguration.EMPTY); + Map policyMap = Collections.singletonMap("policy", policy); + Map policyWithNoRetentionMap = Collections.singletonMap("policy", policyWithNoRetention); + Function>> mkInfos = i -> Collections.singletonList(new Tuple<>("repo", i)); + + // Test when user metadata is null + SnapshotInfo info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), + 0L, "reason", 1L, 1, Collections.emptyList(), true, null); + assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(false)); + + // Test when no retention is configured + info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), + 0L, "reason", 1L, 1, Collections.emptyList(), true, null); + assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyWithNoRetentionMap), equalTo(false)); + + // Test when user metadata is a map that doesn't contain "policy" + info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), + 0L, "reason", 1L, 1, Collections.emptyList(), true, Collections.singletonMap("foo", "bar")); + assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(false)); + + // Test with an ancient snapshot that should be expunged + info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), + 0L, "reason", 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(true)); + + // Test with a snapshot that's start date is old enough to be expunged (but the finish date is not) + long time = System.currentTimeMillis() - TimeValue.timeValueDays(30).millis() - 1; + info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), + time, "reason", time + TimeValue.timeValueDays(4).millis(), 1, Collections.emptyList(), + true, Collections.singletonMap("policy", "policy")); + assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(true)); + + // Test with a fresh snapshot that should not be expunged + info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), + System.currentTimeMillis(), "reason", System.currentTimeMillis() + 1, + 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(false)); + } + + public void testRetentionTask() throws Exception { + try (ThreadPool threadPool = new TestThreadPool("slm-test"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + Client noOpClient = new NoOpClient("slm-test")) { + + SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy("policy", "snap", "1 * * * * ?", + "repo", null, new SnapshotRetentionConfiguration(TimeValue.timeValueDays(30))); + + ClusterState state = createState(policy); + ClusterServiceUtils.setState(clusterService, state); + + final SnapshotInfo eligibleSnapshot = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), + 0L, "reason", 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + final SnapshotInfo ineligibleSnapshot = new SnapshotInfo(new SnapshotId("name2", "uuid2"), Collections.singletonList("index"), + System.currentTimeMillis(), "reason", System.currentTimeMillis() + 1, 1, + Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + + AtomicReference> deleted = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + MockSnapshotRetentionTask retentionTask = new MockSnapshotRetentionTask(noOpClient, clusterService, + () -> { + List> snaps = new ArrayList<>(2); + snaps.add(new Tuple<>("repo", eligibleSnapshot)); + snaps.add(new Tuple<>("repo", ineligibleSnapshot)); + logger.info("--> retrieving snapshots [{}]", snaps); + return snaps; + }, + snapsToDelete -> { + logger.info("--> deleting {}", snapsToDelete); + deleted.set(snapsToDelete.stream().map(Tuple::v2).collect(Collectors.toList())); + latch.countDown(); + }); + + long time = System.currentTimeMillis(); + retentionTask.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_JOB_ID, time, time)); + + latch.await(10, TimeUnit.SECONDS); + + assertNotNull("something should have been deleted", deleted.get()); + assertThat("one snapshot should have been deleted", deleted.get().size(), equalTo(1)); + assertThat(deleted.get().get(0), equalTo(eligibleSnapshot)); + + threadPool.shutdownNow(); + threadPool.awaitTermination(10, TimeUnit.SECONDS); + } + } + + public ClusterState createState(SnapshotLifecyclePolicy... policies) { + Map policyMetadataMap = Arrays.stream(policies) + .map(policy -> SnapshotLifecyclePolicyMetadata.builder() + .setPolicy(policy) + .setHeaders(Collections.emptyMap()) + .setModifiedDate(randomNonNegativeLong()) + .setVersion(randomNonNegativeLong()) + .build()) + .collect(Collectors.toMap(pm -> pm.getPolicy().getId(), pm -> pm)); + + MetaData metaData = MetaData.builder() + .putCustom(SnapshotLifecycleMetadata.TYPE, new SnapshotLifecycleMetadata(policyMetadataMap, OperationMode.RUNNING)) + .build(); + return ClusterState.builder(new ClusterName("cluster")) + .metaData(metaData) + .build(); + } + + private class MockSnapshotRetentionTask extends SnapshotRetentionTask { + + private final Supplier>> snapshotRetriever; + private final Consumer>> snapshotDeleter; + + MockSnapshotRetentionTask(Client client, + ClusterService clusterService, + Supplier>> snapshotRetriever, + Consumer>> snapshotDeleter) { + super(client, clusterService); + this.snapshotRetriever = snapshotRetriever; + this.snapshotDeleter = snapshotDeleter; + } + + @Override + void getAllSnapshots(Collection repositories, + ActionListener>> listener, + Consumer errorHandler) { + listener.onResponse(this.snapshotRetriever.get()); + } + + @Override + void deleteSnapshots(List> snapshotsToDelete) { + this.snapshotDeleter.accept(snapshotsToDelete); + } + } +}