Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize startup CcrRepositories (#36730) #36919

Merged
merged 3 commits into from
Dec 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
private final boolean enabled;
private final Settings settings;
private final CcrLicenseChecker ccrLicenseChecker;

private final SetOnce<CcrRepositoryManager> repositoryManager = new SetOnce<>();
private final SetOnce<CcrRestoreSourceService> restoreSourceService = new SetOnce<>();
private Client client;

Expand Down Expand Up @@ -165,13 +163,12 @@ public Collection<Object> createComponents(
return emptyList();
}

this.repositoryManager.set(new CcrRepositoryManager(settings, clusterService, (NodeClient) client));
CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(settings);
this.restoreSourceService.set(restoreSourceService);
return Arrays.asList(
ccrLicenseChecker,
restoreSourceService,
repositoryManager.get(),
new CcrRepositoryManager(settings, clusterService, (NodeClient) client),
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction;
Expand All @@ -17,31 +18,70 @@
import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryRequest;
import org.elasticsearch.xpack.ccr.repository.CcrRepository;

import java.io.IOException;
import java.util.List;
import java.util.Set;

class CcrRepositoryManager extends RemoteClusterAware {
class CcrRepositoryManager extends AbstractLifecycleComponent {

private final NodeClient client;
private final RemoteSettingsUpdateListener updateListener;

CcrRepositoryManager(Settings settings, ClusterService clusterService, NodeClient client) {
super(settings);
this.client = client;
listenForUpdates(clusterService.getClusterSettings());
updateListener = new RemoteSettingsUpdateListener(settings);
updateListener.listenForUpdates(clusterService.getClusterSettings());
}

@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxyAddress) {
String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias;
if (addresses.isEmpty()) {
DeleteInternalCcrRepositoryRequest request = new DeleteInternalCcrRepositoryRequest(repositoryName);
PlainActionFuture<DeleteInternalCcrRepositoryAction.DeleteInternalCcrRepositoryResponse> f = PlainActionFuture.newFuture();
client.executeLocally(DeleteInternalCcrRepositoryAction.INSTANCE, request, f);
assert f.isDone() : "Should be completed as it is executed synchronously";
} else {
PutInternalCcrRepositoryRequest request = new PutInternalCcrRepositoryRequest(repositoryName, CcrRepository.TYPE);
PlainActionFuture<PutInternalCcrRepositoryAction.PutInternalCcrRepositoryResponse> f = PlainActionFuture.newFuture();
client.executeLocally(PutInternalCcrRepositoryAction.INSTANCE, request, f);
assert f.isDone() : "Should be completed as it is executed synchronously";
protected void doStart() {
updateListener.init();
}

@Override
protected void doStop() {
}

@Override
protected void doClose() throws IOException {
}

private void putRepository(String repositoryName) {
PutInternalCcrRepositoryRequest request = new PutInternalCcrRepositoryRequest(repositoryName, CcrRepository.TYPE);
PlainActionFuture<PutInternalCcrRepositoryAction.PutInternalCcrRepositoryResponse> f = PlainActionFuture.newFuture();
client.executeLocally(PutInternalCcrRepositoryAction.INSTANCE, request, f);
assert f.isDone() : "Should be completed as it is executed synchronously";
}

private void deleteRepository(String repositoryName) {
DeleteInternalCcrRepositoryRequest request = new DeleteInternalCcrRepositoryRequest(repositoryName);
PlainActionFuture<DeleteInternalCcrRepositoryAction.DeleteInternalCcrRepositoryResponse> f = PlainActionFuture.newFuture();
client.executeLocally(DeleteInternalCcrRepositoryAction.INSTANCE, request, f);
assert f.isDone() : "Should be completed as it is executed synchronously";
}

private class RemoteSettingsUpdateListener extends RemoteClusterAware {

private RemoteSettingsUpdateListener(Settings settings) {
super(settings);
}

void init() {
Set<String> clusterAliases = buildRemoteClustersDynamicConfig(settings).keySet();
for (String clusterAlias : clusterAliases) {
putRepository(CcrRepository.NAME_PREFIX + clusterAlias);
}
}

@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy) {
String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias;
if (addresses.isEmpty()) {
deleteRepository(repositoryName);
} else {
putRepository(repositoryName);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
Expand Down Expand Up @@ -122,35 +121,31 @@ public final void startClusters() throws Exception {
}

stopClusters();
NodeConfigurationSource nodeConfigurationSource = createNodeConfigurationSource();
Collection<Class<? extends Plugin>> mockPlugins = Arrays.asList(ESIntegTestCase.TestSeedPlugin.class,
TestZenDiscovery.TestPlugin.class, getTestTransportPlugin());

InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, "leader", mockPlugins,
Function.identity());
InternalTestCluster followerCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, "follower", mockPlugins,
Function.identity());
clusterGroup = new ClusterGroup(leaderCluster, followerCluster);

numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(null), 0, false, "leader",
mockPlugins, Function.identity());
leaderCluster.beforeTest(random(), 0.0D);
leaderCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster());
assertBusy(() -> {
ClusterService clusterService = leaderCluster.getInstance(ClusterService.class);
assertNotNull(clusterService.state().metaData().custom(LicensesMetaData.TYPE));
});

String address = leaderCluster.getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
InternalTestCluster followerCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(address), 0, false, "follower",
mockPlugins, Function.identity());
clusterGroup = new ClusterGroup(leaderCluster, followerCluster);

followerCluster.beforeTest(random(), 0.0D);
followerCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster());
assertBusy(() -> {
ClusterService clusterService = followerCluster.getInstance(ClusterService.class);
assertNotNull(clusterService.state().metaData().custom(LicensesMetaData.TYPE));
});

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
String address = leaderCluster.getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
updateSettingsRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", address));
assertAcked(followerClient().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}

/**
Expand Down Expand Up @@ -188,7 +183,7 @@ public void afterTest() throws Exception {
}
}

private NodeConfigurationSource createNodeConfigurationSource() {
private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedAddress) {
Settings.Builder builder = Settings.builder();
builder.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE);
// Default the watermarks to absurdly low to prevent the tests
Expand All @@ -209,6 +204,9 @@ private NodeConfigurationSource createNodeConfigurationSource() {
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
builder.put(NetworkModule.HTTP_ENABLED.getKey(), false);
if (leaderSeedAddress != null) {
builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress);
}
return new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal) {
Expand Down