Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle snapshot lifecycle policy updates and deletions #40062

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.elasticsearch.xpack.indexlifecycle.action.TransportStartILMAction;
import org.elasticsearch.xpack.indexlifecycle.action.TransportStopILMAction;
import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleService;
import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleTask;
import org.elasticsearch.xpack.snapshotlifecycle.action.DeleteSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.GetSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.PutSnapshotLifecycleAction;
Expand Down Expand Up @@ -151,7 +152,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
}
indexLifecycleInitialisationService.set(new IndexLifecycleService(settings, client, clusterService, threadPool,
getClock(), System::currentTimeMillis, xContentRegistry));
snapshotLifecycleService.set(new SnapshotLifecycleService(settings, client, clusterService, getClock()));
snapshotLifecycleService.set(new SnapshotLifecycleService(settings,
() -> new SnapshotLifecycleTask(client), clusterService, getClock()));
return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;

Expand All @@ -35,7 +36,7 @@ public class SnapshotLifecycleMetadata implements XPackMetaDataCustom {
private final Map<String, SnapshotLifecyclePolicyMetadata> snapshotConfigurations;

public SnapshotLifecycleMetadata(Map<String, SnapshotLifecyclePolicyMetadata> snapshotConfigurations) {
this.snapshotConfigurations = Collections.unmodifiableMap(snapshotConfigurations);
this.snapshotConfigurations = new HashMap<>(snapshotConfigurations);
// TODO: maybe operation mode here so it can be disabled/re-enabled separately like ILM is
}

Expand All @@ -44,7 +45,7 @@ public SnapshotLifecycleMetadata(StreamInput in) throws IOException {
}

public Map<String, SnapshotLifecyclePolicyMetadata> getSnapshotConfigurations() {
return this.snapshotConfigurations;
return Collections.unmodifiableMap(this.snapshotConfigurations);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
Expand All @@ -23,6 +22,10 @@
import java.io.Closeable;
import java.time.Clock;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* {@code SnapshotLifecycleService} manages snapshot policy scheduling and triggering of the
Expand All @@ -32,28 +35,31 @@
public class SnapshotLifecycleService implements LocalNodeMasterListener, Closeable, ClusterStateListener {

private static final Logger logger = LogManager.getLogger(SnapshotLifecycleMetadata.class);
private static final String JOB_PATTERN_SUFFIX = "-\\d+$";

private final SchedulerEngine scheduler;
private final ClusterService clusterService;
private final SnapshotLifecycleTask snapshotTask;
private final Map<String, SchedulerEngine.Job> scheduledTasks = ConcurrentCollections.newConcurrentMap();
private volatile boolean isMaster = false;

public SnapshotLifecycleService(Settings settings, Client client, ClusterService clusterService,
public SnapshotLifecycleService(Settings settings,
Supplier<SnapshotLifecycleTask> taskSupplier,
ClusterService clusterService,
Clock clock) {
this.scheduler = new SchedulerEngine(settings, clock);
this.clusterService = clusterService;
this.snapshotTask = new SnapshotLifecycleTask(client);
this.snapshotTask = taskSupplier.get();
clusterService.addLocalNodeMasterListener(this); // TODO: change this not to use 'this'
clusterService.addListener(this);
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
public void clusterChanged(final ClusterChangedEvent event) {
if (this.isMaster) {
// TODO: handle modified policies (currently they are ignored)
// TODO: handle deleted policies
scheduleSnapshotJobs(event.state());
final ClusterState state = event.state();
scheduleSnapshotJobs(state);
cleanupDeletedPolicies(state);
}
}

Expand All @@ -71,6 +77,11 @@ public void offMaster() {
cancelSnapshotJobs();
}

// Only used for testing
SchedulerEngine getScheduler() {
return this.scheduler;
}

/**
* Schedule all non-scheduled snapshot jobs contained in the cluster state
*/
Expand All @@ -81,35 +92,85 @@ public void scheduleSnapshotJobs(final ClusterState state) {
}
}

public void cleanupDeletedPolicies(final ClusterState state) {
SnapshotLifecycleMetadata snapMeta = state.metaData().custom(SnapshotLifecycleMetadata.TYPE);
if (snapMeta != null) {
// Retrieve all of the expected policy job ids from the policies in the metadata
final Set<String> policyJobIds = snapMeta.getSnapshotConfigurations().values().stream()
.map(SnapshotLifecycleService::getJobId)
.collect(Collectors.toSet());

// Cancel all jobs that are *NOT* in the scheduled tasks map
scheduledTasks.keySet().stream()
.filter(jobId -> policyJobIds.contains(jobId) == false)
.forEach(this::cancelScheduledSnapshot);
}
}

/**
* Schedule the {@link SnapshotLifecyclePolicy} job if it does not already exist. If the job already
* exists it is not interfered with.
* Schedule the {@link SnapshotLifecyclePolicy} job if it does not already exist. First checks
* to see if any previous versions of the policy were scheduled, and if so, cancels those. If
* the same version of a policy has already been scheduled it does not overwrite the job.
*/
public void maybeScheduleSnapshot(final SnapshotLifecyclePolicyMetadata snapshotLifecyclePolicy) {
final String jobId = snapshotLifecyclePolicy.getPolicy().getId();
final String jobId = getJobId(snapshotLifecyclePolicy);
final Pattern existingJobPattern = Pattern.compile(snapshotLifecyclePolicy.getPolicy().getId() + JOB_PATTERN_SUFFIX);

// Find and cancel any existing jobs for this policy
final boolean existingJobsFoundAndCancelled = scheduledTasks.keySet().stream()
// Find all jobs matching the `jobid-\d+` pattern
.filter(jId -> existingJobPattern.matcher(jId).matches())
// Filter out a job that has not been changed (matches the id exactly meaning the version is the same)
.filter(jId -> jId.equals(jobId) == false)
.map(existingJobId -> {
// Cancel existing job so the new one can be scheduled
logger.debug("removing existing snapshot lifecycle job [{}] as it has been updated", existingJobId);
scheduledTasks.remove(existingJobId);
boolean existed = scheduler.remove(existingJobId);
assert existed : "expected job for " + existingJobId + " to exist in scheduler";
return existed;
})
.reduce(false, (a, b) -> a || b);

// Now atomically schedule the new job and add it to the scheduled tasks map. If the jobId
// is identical to an existing job (meaning the version has not changed) then this does
// not reschedule it.
scheduledTasks.computeIfAbsent(jobId, id -> {
final SchedulerEngine.Job job = new SchedulerEngine.Job(jobId,
new CronSchedule(snapshotLifecyclePolicy.getPolicy().getSchedule()));
logger.info("scheduling snapshot lifecycle job [{}]", jobId);
if (existingJobsFoundAndCancelled) {
logger.info("rescheduling updated snapshot lifecycle job [{}]", jobId);
} else {
logger.info("scheduling snapshot lifecycle job [{}]", jobId);
}
scheduler.add(job);
return job;
});
}

/**
* Generate the job id for a given policy metadata. The job id is {@code <policyid>-<version>}
*/
static String getJobId(SnapshotLifecyclePolicyMetadata policyMeta) {
return policyMeta.getPolicy().getId() + "-" + policyMeta.getVersion();
}

/**
* Cancel all scheduled snapshot jobs
*/
public void cancelSnapshotJobs() {
logger.trace("cancelling all snapshot lifecycle jobs");
scheduler.scheduledJobIds().forEach(scheduler::remove);
scheduledTasks.clear();
}

/**
* Cancel the given snapshot lifecycle id
* Cancel the given policy job id (from {@link #getJobId(SnapshotLifecyclePolicyMetadata)}
*/
public void cancelScheduledSnapshot(final String snapshotLifecycleId) {
scheduledTasks.remove(snapshotLifecycleId);
scheduler.remove(snapshotLifecycleId);
public void cancelScheduledSnapshot(final String lifecycleJobId) {
logger.debug("cancelling snapshot lifecycle job [{}] as it no longer exists", lifecycleJobId);
scheduledTasks.remove(lifecycleJobId);
scheduler.remove(lifecycleJobId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class SnapshotLifecycleTask implements SchedulerEngine.Listener {

private final Client client;

SnapshotLifecycleTask(final Client client) {
public SnapshotLifecycleTask(final Client client) {
this.client = client;
}

Expand Down
Loading