diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 70d4905d94375..58ba11e4d0488 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ccr; -import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.client.Client; @@ -111,7 +110,6 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E private final boolean enabled; private final Settings settings; private final CcrLicenseChecker ccrLicenseChecker; - private final SetOnce repositoryManager = new SetOnce<>(); private Client client; /** @@ -152,10 +150,9 @@ public Collection createComponents( return emptyList(); } - this.repositoryManager.set(new CcrRepositoryManager(settings, clusterService, client)); - return Arrays.asList( ccrLicenseChecker, + new CcrRepositoryManager(settings, clusterService, client), new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis) ); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java index a1504ff2f8acd..54403df367809 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction; @@ -18,31 +19,70 @@ import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryRequest; import org.elasticsearch.xpack.ccr.repository.CcrRepository; +import java.io.IOException; import java.util.List; +import java.util.Set; -class CcrRepositoryManager extends RemoteClusterAware { +class CcrRepositoryManager extends AbstractLifecycleComponent { private final Client client; + private final RemoteSettingsUpdateListener updateListener; CcrRepositoryManager(Settings settings, ClusterService clusterService, Client client) { super(settings); this.client = client; - listenForUpdates(clusterService.getClusterSettings()); + updateListener = new RemoteSettingsUpdateListener(settings); + updateListener.listenForUpdates(clusterService.getClusterSettings()); } @Override - protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress) { - String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias; - if (addresses.isEmpty()) { - DeleteInternalCcrRepositoryRequest request = new DeleteInternalCcrRepositoryRequest(repositoryName); - PlainActionFuture f = PlainActionFuture.newFuture(); - client.execute(DeleteInternalCcrRepositoryAction.INSTANCE, request, f); - assert f.isDone() : "Should be completed as it is executed synchronously"; - } else { - ActionRequest request = new PutInternalCcrRepositoryRequest(repositoryName, CcrRepository.TYPE); - PlainActionFuture f = PlainActionFuture.newFuture(); - client.execute(PutInternalCcrRepositoryAction.INSTANCE, request, f); - assert f.isDone() : "Should be completed as it is executed synchronously"; + protected void doStart() { + updateListener.init(); + } + + @Override + protected void doStop() { + } + + @Override + protected void doClose() throws IOException { + } + + private void putRepository(String repositoryName) { + ActionRequest request = new PutInternalCcrRepositoryRequest(repositoryName, CcrRepository.TYPE); + PlainActionFuture f = PlainActionFuture.newFuture(); + client.execute(PutInternalCcrRepositoryAction.INSTANCE, request, f); + assert f.isDone() : "Should be completed as it is executed synchronously"; + } + + private void deleteRepository(String repositoryName) { + DeleteInternalCcrRepositoryRequest request = new DeleteInternalCcrRepositoryRequest(repositoryName); + PlainActionFuture f = PlainActionFuture.newFuture(); + client.execute(DeleteInternalCcrRepositoryAction.INSTANCE, request, f); + assert f.isDone() : "Should be completed as it is executed synchronously"; + } + + private class RemoteSettingsUpdateListener extends RemoteClusterAware { + + private RemoteSettingsUpdateListener(Settings settings) { + super(settings); + } + + void init() { + Set clusterAliases = buildRemoteClustersDynamicConfig(settings).keySet(); + for (String clusterAlias : clusterAliases) { + putRepository(CcrRepository.NAME_PREFIX + clusterAlias); + } + } + + @Override + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxy) { + String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias; + if (addresses.isEmpty()) { + deleteRepository(repositoryName); + } else { + putRepository(repositoryName); + } } } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 5abe852ca5ff0..8865c53691786 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; -import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; @@ -117,27 +116,23 @@ public final void startClusters() throws Exception { } stopClusters(); - NodeConfigurationSource nodeConfigurationSource = createNodeConfigurationSource(); Collection> mockPlugins = Arrays.asList(ESIntegTestCase.TestSeedPlugin.class, TestZenDiscovery.TestPlugin.class, MockHttpTransport.TestPlugin.class, getTestTransportPlugin()); InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(), - numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, "leader", mockPlugins, + numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(null), 0, "leader", mockPlugins, Function.identity()); + leaderCluster.beforeTest(random(), 0.0D); + leaderCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster()); + + String address = leaderCluster.getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); InternalTestCluster followerCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(), - numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, "follower", mockPlugins, - Function.identity()); + numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(address), 0, "follower", + mockPlugins, Function.identity()); clusterGroup = new ClusterGroup(leaderCluster, followerCluster); - leaderCluster.beforeTest(random(), 0.0D); - leaderCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster()); followerCluster.beforeTest(random(), 0.0D); followerCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster()); - - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - String address = leaderCluster.getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); - updateSettingsRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", address)); - assertAcked(followerClient().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); } /** @@ -175,7 +170,7 @@ public void afterTest() throws Exception { } } - private NodeConfigurationSource createNodeConfigurationSource() { + private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedAddress) { Settings.Builder builder = Settings.builder(); builder.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE); // Default the watermarks to absurdly low to prevent the tests @@ -195,6 +190,9 @@ private NodeConfigurationSource createNodeConfigurationSource() { builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); + if (leaderSeedAddress != null) { + builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress); + } return new NodeConfigurationSource() { @Override public Settings nodeSettings(int nodeOrdinal) {