Skip to content

Commit

Permalink
Move SnapshotsRecoveryPlannerService to its own x-pack plugin (#79637)
Browse files Browse the repository at this point in the history
  • Loading branch information
fcofdez authored Oct 26, 2021
1 parent 07428a1 commit 8430a58
Show file tree
Hide file tree
Showing 27 changed files with 1,546 additions and 926 deletions.
2 changes: 0 additions & 2 deletions qa/snapshot-based-recoveries/build.gradle

This file was deleted.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ShardSnapshot {
private final Store.MetadataSnapshot metadataSnapshot;
private final org.apache.lucene.util.Version commitLuceneVersion;

ShardSnapshot(ShardSnapshotInfo shardSnapshotInfo,
public ShardSnapshot(ShardSnapshotInfo shardSnapshotInfo,
List<BlobStoreIndexShardSnapshot.FileInfo> snapshotFiles,
Map<String, String> luceneCommitUserData,
org.apache.lucene.util.Version commitLuceneVersion) {
Expand Down
32 changes: 25 additions & 7 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.recovery.plan.SourceOnlyRecoveryPlannerService;
import org.elasticsearch.plugins.RecoveryPlannerPlugin;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -118,7 +120,6 @@
import org.elasticsearch.indices.recovery.SnapshotFilesProvider;
import org.elasticsearch.indices.recovery.plan.RecoveryPlannerService;
import org.elasticsearch.indices.recovery.plan.ShardSnapshotsService;
import org.elasticsearch.indices.recovery.plan.SnapshotsRecoveryPlannerService;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.monitor.MonitorService;
Expand Down Expand Up @@ -697,6 +698,8 @@ protected Node(final Environment initialEnvironment,
final PluginShutdownService pluginShutdownService = new PluginShutdownService(shutdownAwarePlugins);
clusterService.addListener(pluginShutdownService);

final RecoveryPlannerService recoveryPlannerService = getRecoveryPlannerService(threadPool, clusterService, repositoryService);

modules.add(b -> {
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
Expand Down Expand Up @@ -741,12 +744,6 @@ protected Node(final Environment initialEnvironment,
b.bind(Coordinator.class).toInstance(discoveryModule.getCoordinator());
{
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
final ShardSnapshotsService shardSnapshotsService = new ShardSnapshotsService(client,
repositoryService,
threadPool,
clusterService
);
final RecoveryPlannerService recoveryPlannerService = new SnapshotsRecoveryPlannerService(shardSnapshotsService);
final SnapshotFilesProvider snapshotFilesProvider =
new SnapshotFilesProvider(repositoryService);
b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(transportService,
Expand Down Expand Up @@ -821,6 +818,27 @@ protected Node(final Environment initialEnvironment,
}
}

private RecoveryPlannerService getRecoveryPlannerService(ThreadPool threadPool,
ClusterService clusterService,
RepositoriesService repositoryService) {
final List<RecoveryPlannerPlugin> recoveryPlannerPlugins = pluginsService.filterPlugins(RecoveryPlannerPlugin.class);
if (recoveryPlannerPlugins.isEmpty()) {
return new SourceOnlyRecoveryPlannerService();
}

if (recoveryPlannerPlugins.size() > 1) {
throw new IllegalStateException("A single RecoveryPlannerPlugin was expected but got: " + recoveryPlannerPlugins);
}

final ShardSnapshotsService shardSnapshotsService = new ShardSnapshotsService(client,
repositoryService,
threadPool,
clusterService
);
final RecoveryPlannerPlugin recoveryPlannerPlugin = recoveryPlannerPlugins.get(0);
return recoveryPlannerPlugin.createRecoveryPlannerService(shardSnapshotsService);
}


protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool,
TransportInterceptor interceptor,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.plugins;

import org.elasticsearch.indices.recovery.plan.RecoveryPlannerService;
import org.elasticsearch.indices.recovery.plan.ShardSnapshotsService;

/**
* A plugin that allows creating custom {@code RecoveryPlannerService}. Only one plugin of this type
* is allowed to be installed at once.
*/
public interface RecoveryPlannerPlugin {
RecoveryPlannerService createRecoveryPlannerService(ShardSnapshotsService shardSnapshotsService);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.plan.SnapshotsRecoveryPlannerService;
import org.elasticsearch.indices.recovery.plan.RecoveryPlannerService;
import org.elasticsearch.test.NodeRoles;
import org.elasticsearch.transport.TransportService;

Expand All @@ -37,7 +37,7 @@ public void testDuplicateRecoveries() throws IOException {
PeerRecoverySourceService peerRecoverySourceService = new PeerRecoverySourceService(
mock(TransportService.class), indicesService,
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
mock(SnapshotsRecoveryPlannerService.class));
mock(RecoveryPlannerService.class));
StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10),
getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(),
SequenceNumbers.UNASSIGNED_SEQ_NO, true);
Expand Down
25 changes: 23 additions & 2 deletions server/src/test/java/org/elasticsearch/node/NodeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.indices.recovery.plan.RecoveryPlannerService;
import org.elasticsearch.indices.recovery.plan.ShardSnapshotsService;
import org.elasticsearch.plugins.RecoveryPlannerPlugin;
import org.elasticsearch.xcontent.ContextParser;
import org.elasticsearch.xcontent.MediaType;
import org.elasticsearch.xcontent.NamedObjectNotFoundException;
Expand All @@ -41,7 +44,6 @@
import org.elasticsearch.test.MockHttpTransport;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.threadpool.ThreadPool;
import org.mockito.Mockito;

import java.io.IOException;
import java.nio.file.Path;
Expand All @@ -63,6 +65,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;

@LuceneTestCase.SuppressFileSystems(value = "ExtrasFS")
public class NodeTests extends ESTestCase {
Expand Down Expand Up @@ -319,6 +322,24 @@ public void testCreateWithCircuitBreakerPlugins() throws IOException {
}
}

public void testNodeFailsToStartWhenThereAreMultipleRecoveryPlannerPluginsLoaded() {
List<Class<? extends Plugin>> plugins = basePlugins();
plugins.add(MockRecoveryPlannerPlugin.class);
plugins.add(MockRecoveryPlannerPlugin.class);
IllegalStateException exception = expectThrows(IllegalStateException.class, () -> new MockNode(baseSettings().build(), plugins));
assertThat(exception.getMessage(), containsString("A single RecoveryPlannerPlugin was expected but got:"));
}

public static class MockRecoveryPlannerPlugin extends Plugin implements RecoveryPlannerPlugin {
public MockRecoveryPlannerPlugin() {
}

@Override
public RecoveryPlannerService createRecoveryPlannerService(ShardSnapshotsService shardSnapshotsService) {
return mock(RecoveryPlannerService.class);
}
}

public static class MockCircuitBreakerPlugin extends Plugin implements CircuitBreakerPlugin {

private SetOnce<CircuitBreaker> myCircuitBreaker = new SetOnce<>();
Expand All @@ -345,7 +366,7 @@ public void setCircuitBreaker(CircuitBreaker circuitBreaker) {

@SuppressWarnings("unchecked")
private static ContextParser<Object, Integer> mockContextParser() {
return Mockito.mock(ContextParser.class);
return mock(ContextParser.class);
}

static NamedXContentRegistry.Entry compatibleEntries = new NamedXContentRegistry.Entry(Integer.class,
Expand Down
Loading

0 comments on commit 8430a58

Please sign in to comment.