Skip to content

Commit

Permalink
Initialize startup CcrRepositories (#36730)
Browse files Browse the repository at this point in the history
Currently, the CcrRepositoryManger only listens for settings updates
and installs new repositories. It does not install the repositories that
are in the initial settings. This commit, modifies the manager to
install the initial repositories. Additionally, it modifies the ccr
integration test to configure the remote leader node at startup, instead
of using a settings update.
  • Loading branch information
Tim-Brooks authored Dec 17, 2018
1 parent 7bf822b commit 3dd5a5a
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.ccr;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
Expand Down Expand Up @@ -111,7 +110,6 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
private final boolean enabled;
private final Settings settings;
private final CcrLicenseChecker ccrLicenseChecker;
private final SetOnce<CcrRepositoryManager> repositoryManager = new SetOnce<>();
private Client client;

/**
Expand Down Expand Up @@ -152,10 +150,9 @@ public Collection<Object> createComponents(
return emptyList();
}

this.repositoryManager.set(new CcrRepositoryManager(settings, clusterService, client));

return Arrays.asList(
ccrLicenseChecker,
new CcrRepositoryManager(settings, clusterService, client),
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction;
Expand All @@ -18,31 +19,70 @@
import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryRequest;
import org.elasticsearch.xpack.ccr.repository.CcrRepository;

import java.io.IOException;
import java.util.List;
import java.util.Set;

class CcrRepositoryManager extends RemoteClusterAware {
class CcrRepositoryManager extends AbstractLifecycleComponent {

private final Client client;
private final RemoteSettingsUpdateListener updateListener;

CcrRepositoryManager(Settings settings, ClusterService clusterService, Client client) {
super(settings);
this.client = client;
listenForUpdates(clusterService.getClusterSettings());
updateListener = new RemoteSettingsUpdateListener(settings);
updateListener.listenForUpdates(clusterService.getClusterSettings());
}

@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxyAddress) {
String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias;
if (addresses.isEmpty()) {
DeleteInternalCcrRepositoryRequest request = new DeleteInternalCcrRepositoryRequest(repositoryName);
PlainActionFuture<DeleteInternalCcrRepositoryAction.DeleteInternalCcrRepositoryResponse> f = PlainActionFuture.newFuture();
client.execute(DeleteInternalCcrRepositoryAction.INSTANCE, request, f);
assert f.isDone() : "Should be completed as it is executed synchronously";
} else {
ActionRequest request = new PutInternalCcrRepositoryRequest(repositoryName, CcrRepository.TYPE);
PlainActionFuture<PutInternalCcrRepositoryAction.PutInternalCcrRepositoryResponse> f = PlainActionFuture.newFuture();
client.execute(PutInternalCcrRepositoryAction.INSTANCE, request, f);
assert f.isDone() : "Should be completed as it is executed synchronously";
protected void doStart() {
updateListener.init();
}

@Override
protected void doStop() {
}

@Override
protected void doClose() throws IOException {
}

private void putRepository(String repositoryName) {
ActionRequest request = new PutInternalCcrRepositoryRequest(repositoryName, CcrRepository.TYPE);
PlainActionFuture<PutInternalCcrRepositoryAction.PutInternalCcrRepositoryResponse> f = PlainActionFuture.newFuture();
client.execute(PutInternalCcrRepositoryAction.INSTANCE, request, f);
assert f.isDone() : "Should be completed as it is executed synchronously";
}

private void deleteRepository(String repositoryName) {
DeleteInternalCcrRepositoryRequest request = new DeleteInternalCcrRepositoryRequest(repositoryName);
PlainActionFuture<DeleteInternalCcrRepositoryAction.DeleteInternalCcrRepositoryResponse> f = PlainActionFuture.newFuture();
client.execute(DeleteInternalCcrRepositoryAction.INSTANCE, request, f);
assert f.isDone() : "Should be completed as it is executed synchronously";
}

private class RemoteSettingsUpdateListener extends RemoteClusterAware {

private RemoteSettingsUpdateListener(Settings settings) {
super(settings);
}

void init() {
Set<String> clusterAliases = buildRemoteClustersDynamicConfig(settings).keySet();
for (String clusterAlias : clusterAliases) {
putRepository(CcrRepository.NAME_PREFIX + clusterAlias);
}
}

@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy) {
String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias;
if (addresses.isEmpty()) {
deleteRepository(repositoryName);
} else {
putRepository(repositoryName);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
Expand Down Expand Up @@ -117,27 +116,23 @@ public final void startClusters() throws Exception {
}

stopClusters();
NodeConfigurationSource nodeConfigurationSource = createNodeConfigurationSource();
Collection<Class<? extends Plugin>> mockPlugins = Arrays.asList(ESIntegTestCase.TestSeedPlugin.class,
TestZenDiscovery.TestPlugin.class, MockHttpTransport.TestPlugin.class, getTestTransportPlugin());

InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, "leader", mockPlugins,
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(null), 0, "leader", mockPlugins,
Function.identity());
leaderCluster.beforeTest(random(), 0.0D);
leaderCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster());

String address = leaderCluster.getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
InternalTestCluster followerCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, "follower", mockPlugins,
Function.identity());
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(address), 0, "follower",
mockPlugins, Function.identity());
clusterGroup = new ClusterGroup(leaderCluster, followerCluster);

leaderCluster.beforeTest(random(), 0.0D);
leaderCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster());
followerCluster.beforeTest(random(), 0.0D);
followerCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster());

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
String address = leaderCluster.getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
updateSettingsRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", address));
assertAcked(followerClient().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}

/**
Expand Down Expand Up @@ -175,7 +170,7 @@ public void afterTest() throws Exception {
}
}

private NodeConfigurationSource createNodeConfigurationSource() {
private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedAddress) {
Settings.Builder builder = Settings.builder();
builder.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE);
// Default the watermarks to absurdly low to prevent the tests
Expand All @@ -195,6 +190,9 @@ private NodeConfigurationSource createNodeConfigurationSource() {
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
if (leaderSeedAddress != null) {
builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress);
}
return new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal) {
Expand Down

0 comments on commit 3dd5a5a

Please sign in to comment.