diff --git a/server/src/main/java/org/elasticsearch/action/main/TransportMainAction.java b/server/src/main/java/org/elasticsearch/action/main/TransportMainAction.java index d560a7ecc11d3..5020c7dc63a32 100644 --- a/server/src/main/java/org/elasticsearch/action/main/TransportMainAction.java +++ b/server/src/main/java/org/elasticsearch/action/main/TransportMainAction.java @@ -50,7 +50,6 @@ public TransportMainAction(Settings settings, ThreadPool threadPool, TransportSe protected void doExecute(MainRequest request, ActionListener listener) { ClusterState clusterState = clusterService.state(); assert Node.NODE_NAME_SETTING.exists(settings); - final boolean available = clusterState.getBlocks().hasGlobalBlock(RestStatus.SERVICE_UNAVAILABLE) == false; listener.onResponse( new MainResponse(Node.NODE_NAME_SETTING.get(settings), Version.CURRENT, clusterState.getClusterName(), clusterState.metaData().clusterUUID(), Build.CURRENT)); diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 4440153dd361e..4b6e622d159d4 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -20,12 +20,14 @@ package org.elasticsearch.node; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.ThreadContext; import org.apache.lucene.util.Constants; import org.apache.lucene.util.SetOnce; import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionModule; import org.elasticsearch.action.Action; import org.elasticsearch.action.search.SearchExecutionStatsCollector; @@ -139,6 +141,7 @@ import org.elasticsearch.tasks.TaskResultsService; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportService; @@ -165,8 +168,10 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.UnaryOperator; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -678,17 +683,37 @@ public Node start() throws NodeValidationException { : "clusterService has a different local node than the factory provided"; transportService.acceptIncomingRequests(); discovery.startInitialJoin(); + final ThreadPool thread = injector.getInstance(ThreadPool.class); final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings); - if (initialStateTimeout.millis() > 0) { - final ThreadPool thread = injector.getInstance(ThreadPool.class); - ClusterState clusterState = clusterService.state(); - ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext()); - if (clusterState.nodes().getMasterNodeId() == null) { + final boolean connectToRemote = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings); + final boolean waitForState = initialStateTimeout.millis() > 0; + Predicate connectRemoteClusterPredicate = state -> "_na_".equals(state.metaData().clusterUUID()) == false; + ClusterState clusterState = clusterService.state(); + AtomicBoolean connectRemoteClusters = new AtomicBoolean(connectToRemote); + if (waitForState) { + CountDownLatch latch = new CountDownLatch(1); + Predicate clusterStatePredicate = state -> state.nodes().getMasterNodeId() != null; + final Consumer consumer; + if (connectToRemote) { + clusterStatePredicate = clusterStatePredicate.and(connectRemoteClusterPredicate); + connectRemoteClusters.set(false); + consumer = c -> transportService.getRemoteClusterService().initializeRemoteClusters(c.metaData().clusterUUID(), + c.metaData().settings(), ActionListener.wrap(v -> latch.countDown(), e -> { + latch.countDown(); + logger.warn("Failed to connect to remote clusters", e); + })); + } else { + consumer = c -> latch.countDown(); + } + if (clusterStatePredicate.test(clusterState) == false) { logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout); - final CountDownLatch latch = new CountDownLatch(1); + ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, initialStateTimeout, logger, + thread.getThreadContext()); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override - public void onNewClusterState(ClusterState state) { latch.countDown(); } + public void onNewClusterState(ClusterState state) { + consumer.accept(state); + } @Override public void onClusterServiceClose() { @@ -701,13 +726,42 @@ public void onTimeout(TimeValue timeout) { initialStateTimeout); latch.countDown(); } - }, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout); + }, clusterStatePredicate); + } else { + consumer.accept(clusterState); + } + try { + latch.await(); + } catch (InterruptedException e) { + throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state"); + } + } + if (connectRemoteClusters.get()) { + Consumer consumer = state -> transportService.getRemoteClusterService().initializeRemoteClusters( + state.metaData().clusterUUID(), state.metaData().settings(), ActionListener.wrap(v -> {}, + e -> logger.warn("Failed to connect to remote clusters", e))); + if (connectRemoteClusterPredicate.test(clusterState) == false) { + ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, + thread.getThreadContext()); + // + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + consumer.accept(state); + } - try { - latch.await(); - } catch (InterruptedException e) { - throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state"); - } + @Override + public void onClusterServiceClose() { + } + + @Override + public void onTimeout(TimeValue timeout) { + assert false; + } + }, connectRemoteClusterPredicate); + + } else { + consumer.accept(clusterState); } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index c86ea61980a87..8cc35ce1c3cca 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -29,10 +29,14 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.main.MainAction; +import org.elasticsearch.action.main.MainRequest; +import org.elasticsearch.action.main.MainResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; @@ -50,6 +54,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -83,11 +88,11 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo private final String clusterAlias; private final int maxNumRemoteConnections; private final Predicate nodePredicate; + private final String localClusterUUID; private volatile List seedNodes; private volatile boolean skipUnavailable; private final ConnectHandler connectHandler; - private SetOnce remoteClusterName = new SetOnce<>(); - private final ClusterName localClusterName; + private final SetOnce remoteClusterAndUUID = new SetOnce<>(); /** * Creates a new {@link RemoteClusterConnection} @@ -99,10 +104,15 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo * @param nodePredicate a predicate to filter eligible remote nodes to connect to */ RemoteClusterConnection(Settings settings, String clusterAlias, List seedNodes, - TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate) { + TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate, + String localClusterUUID) { super(settings); - this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); this.transportService = transportService; + if ("_na_".equals(localClusterUUID)) { + throw new IllegalArgumentException("invalid local clusterstate UUID: " + localClusterUUID); + } + this.localClusterUUID = Objects.requireNonNull(localClusterUUID); + this.maxNumRemoteConnections = maxNumRemoteConnections; this.nodePredicate = nodePredicate; this.clusterAlias = clusterAlias; @@ -312,7 +322,7 @@ public boolean isClosed() { return connectHandler.isClosed(); } - private ConnectionProfile getRemoteProfile(ClusterName name) { + private ConnectionProfile getRemoteProfile() { // we can only compare the cluster name to make a decision if we should use a remote profile // we can't use a cluster UUID here since we could be connecting to that remote cluster before // the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a @@ -320,7 +330,7 @@ private ConnectionProfile getRemoteProfile(ClusterName name) { // have the same name as the local one is minor here. // the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster, // gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity - if (this.localClusterName.equals(name)) { + if (remoteClusterAndUUID.get() != null && this.localClusterUUID.equals(remoteClusterAndUUID.get().clusterUUID)) { return null; } else { return remoteProfile; @@ -438,6 +448,67 @@ protected void doRun() { }); } + Transport.Connection findFirstReadyNode(Iterator seedNodes) throws IOException, InterruptedException { + boolean remoteClusterHasNotFormed = false; + while (seedNodes.hasNext()) { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("remote connect thread got interrupted"); + } + final DiscoveryNode seedNode = seedNodes.next(); + final MainResponse mainResponse; + boolean success = false; + Transport.Connection connection = transportService.openConnection(seedNode, + ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null)); + ThreadPool threadPool = transportService.getThreadPool(); + ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + // we stash any context here since this is an internal execution and should not leak any + // existing context information. + threadContext.markAsSystemContext(); + PlainTransportFuture futureHandler = new PlainTransportFuture<>( + new FutureTransportResponseHandler() { + @Override + public MainResponse read(StreamInput in) throws IOException { + MainResponse response = MainAction.INSTANCE.newResponse(); + response.readFrom(in); + return response; + } + }); + TransportRequestOptions options = TransportRequestOptions.builder().withTimeout(remoteProfile + .getHandshakeTimeout().millis()).build(); + transportService.sendRequest(connection, MainAction.NAME, new MainRequest(), options, + futureHandler); + mainResponse = futureHandler.txGet(); + if ("_na_".equals(mainResponse.getClusterUuid()) == false) { + ClusterNameAndUUID clusterNameAndUUID = remoteClusterAndUUID.get(); + if (clusterNameAndUUID == null) { + remoteClusterAndUUID.set(new ClusterNameAndUUID(mainResponse.getClusterName(), + mainResponse.getClusterUuid())); + } else if (clusterNameAndUUID.clusterName.equals(mainResponse.getClusterName()) == false) { + throw new IllegalStateException("handshake failed, mismatched cluster name [" + mainResponse.getClusterName() + + "] - " + seedNode); + } else if (clusterNameAndUUID.clusterUUID.equals(mainResponse.getClusterUuid()) == false) { + throw new IllegalStateException("handshake failed, mismatched cluster UUID [" + mainResponse.getClusterUuid() + + "] - " + seedNode); + } + success = true; + return connection; + } else { + remoteClusterHasNotFormed = true; + } + } finally { + if (success == false) { + connection.close(); + } + } + } + if (remoteClusterHasNotFormed) { + throw new IllegalStateException("seed nodes have not joined a cluster yet"); + } else { + throw new IllegalStateException("no seed node left"); + } + } + void collectRemoteNodes(Iterator seedNodes, final TransportService transportService, ActionListener listener) { if (Thread.currentThread().isInterrupted()) { @@ -446,28 +517,23 @@ void collectRemoteNodes(Iterator seedNodes, try { if (seedNodes.hasNext()) { cancellableThreads.executeIO(() -> { - final DiscoveryNode seedNode = seedNodes.next(); final TransportService.HandshakeResponse handshakeResponse; - Transport.Connection connection = transportService.openConnection(seedNode, - ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null)); + Transport.Connection connection = findFirstReadyNode(seedNodes); boolean success = false; try { try { handshakeResponse = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(), - (c) -> remoteClusterName.get() == null ? true : c.equals(remoteClusterName.get())); + (c) -> c.equals(remoteClusterAndUUID.get().clusterName)); } catch (IllegalStateException ex) { logger.warn(() -> new ParameterizedMessage("seed node {} cluster name mismatch expected " + - "cluster name {}", connection.getNode(), remoteClusterName.get()), ex); + "cluster name {}", connection.getNode(), remoteClusterAndUUID.get()), ex); throw ex; } final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode(); if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) { - transportService.connectToNode(handshakeNode, getRemoteProfile(handshakeResponse.getClusterName())); - if (remoteClusterName.get() == null) { - assert handshakeResponse.getClusterName().value() != null; - remoteClusterName.set(handshakeResponse.getClusterName()); - } + transportService.connectToNode(handshakeNode, getRemoteProfile()); + assert handshakeResponse.getClusterName().value() != null; connectedNodes.add(handshakeNode); } ClusterStateRequest request = new ClusterStateRequest(); @@ -557,10 +623,6 @@ public ClusterStateResponse newInstance() { @Override public void handleResponse(ClusterStateResponse response) { try { - if (remoteClusterName.get() == null) { - assert response.getClusterName().value() != null; - remoteClusterName.set(response.getClusterName()); - } try (Closeable theConnection = connection) { // the connection is unused - see comment in #collectRemoteNodes // we have to close this connection before we notify listeners - this is mainly needed for test correctness // since if we do it afterwards we might fail assertions that check if all high level connections are closed. @@ -573,7 +635,7 @@ public void handleResponse(ClusterStateResponse response) { for (DiscoveryNode node : nodesIter) { if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) { try { - transportService.connectToNode(node, getRemoteProfile(remoteClusterName.get())); // noop if node is + transportService.connectToNode(node, getRemoteProfile()); // noop if node is // connected connectedNodes.add(node); } catch (ConnectTransportException | IllegalStateException ex) { @@ -696,4 +758,14 @@ private synchronized void ensureIteratorAvailable() { } } } + + private static class ClusterNameAndUUID { + final ClusterName clusterName; + final String clusterUUID; + + private ClusterNameAndUUID(ClusterName clusterName, String clusterUUID) { + this.clusterName = clusterName; + this.clusterUUID = clusterUUID; + } + } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index a07de63d53734..89891f6f5f0d3 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.transport; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; @@ -49,6 +50,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; @@ -103,6 +105,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl private final TransportService transportService; private final int numRemoteConnections; private volatile Map remoteClusters = Collections.emptyMap(); + private final SetOnce localCusterUUID = new SetOnce<>(); RemoteClusterService(Settings settings, TransportService transportService) { super(settings); @@ -116,6 +119,10 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl * @param connectionListener a listener invoked once every configured cluster has been connected to */ private synchronized void updateRemoteClusters(Map> seeds, ActionListener connectionListener) { + if (localCusterUUID.get() == null) { + connectionListener.onFailure(new IllegalStateException("RemoteClusterService is not initialized no cluster uuid set")); + return; + } if (seeds.containsKey(LOCAL_CLUSTER_GROUP_KEY)) { throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); } @@ -139,7 +146,7 @@ private synchronized void updateRemoteClusters(Map> if (remote == null) { // this is a new cluster we have to add a new representation remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService, numRemoteConnections, - getNodePredicate(settings)); + getNodePredicate(settings), localCusterUUID.get()); remoteClusters.put(entry.getKey(), remote); } @@ -327,26 +334,18 @@ void updateRemoteCluster( updateRemoteClusters(Collections.singletonMap(clusterAlias, nodes), connectionListener); } - /** - * Connects to all remote clusters in a blocking fashion. This should be called on node startup to establish an initial connection - * to all configured seed nodes. - */ - void initializeRemoteClusters() { - final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); - final PlainActionFuture future = new PlainActionFuture<>(); - Map> seeds = RemoteClusterAware.buildRemoteClustersSeeds(settings); - updateRemoteClusters(seeds, future); - try { - future.get(timeValue.millis(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (TimeoutException ex) { - logger.warn("failed to connect to remote clusters within {}", timeValue.toString()); - } catch (Exception e) { - throw new IllegalStateException("failed to connect to remote clusters", e); + + public void initializeRemoteClusters(String localClusterUUID, Settings clusterSettings, ActionListener listener) { + if (localClusterUUID.equals("_na_")) { + throw new IllegalArgumentException("invalid local cluster uuid: " + localClusterUUID); } + this.localCusterUUID.set(localClusterUUID); + Map> seeds = RemoteClusterAware.buildRemoteClustersSeeds(Settings.builder().put(settings, false) + .put(clusterSettings).build()); + updateRemoteClusters(seeds, listener); } + @Override public void close() throws IOException { IOUtils.close(remoteClusters.values()); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 656d8c3841769..6026cd51d1ee4 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -233,10 +233,6 @@ protected void doStart() { false, false, (request, channel) -> channel.sendResponse( new HandshakeResponse(localNode, clusterName, localNode.getVersion()))); - if (connectToRemoteCluster) { - // here we start to connect to the remote clusters - remoteClusterService.initializeRemoteClusters(); - } } @Override @@ -413,7 +409,6 @@ public HandshakeResponse newInstance() { } catch (Exception e) { throw new IllegalStateException("handshake failed with " + node, e); } - if (!clusterNamePredicate.test(response.clusterName)) { throw new IllegalStateException("handshake failed, mismatched cluster name [" + response.clusterName + "] - " + node); } else if (response.version.isCompatible(localNode.getVersion()) == false) { diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 637b8fb26a880..1ce3b4c4be9ad 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.Build; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; @@ -29,6 +30,9 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.main.MainAction; +import org.elasticsearch.action.main.MainRequest; +import org.elasticsearch.action.main.MainResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -38,6 +42,7 @@ import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; @@ -102,6 +107,10 @@ public static MockTransportService startTransport(String id, List return startTransport(id, knownNodes, version, threadPool, Settings.EMPTY); } + private String clusterUUID() { + return randomBoolean() ? ClusterName.DEFAULT.value() : randomAlphaOfLengthBetween(1, 10); + } + public static MockTransportService startTransport( final String id, final List knownNodes, @@ -113,7 +122,7 @@ public static MockTransportService startTransport( ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(s); MockTransportService newService = MockTransportService.createNewService(s, version, threadPool, null); try { - newService.registerRequestHandler(ClusterSearchShardsAction.NAME,ThreadPool.Names.SAME, ClusterSearchShardsRequest::new, + newService.registerRequestHandler(ClusterSearchShardsAction.NAME, ThreadPool.Names.SAME, ClusterSearchShardsRequest::new, (request, channel) -> { if ("index_not_found".equals(request.preference())) { channel.sendResponse(new IndexNotFoundException("index")); @@ -122,6 +131,13 @@ public static MockTransportService startTransport( knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap())); } }); + Writeable.Reader reader = stream -> { + MainRequest mainRequest = new MainRequest(); + mainRequest.readFrom(stream); + return mainRequest; + }; + newService.registerRequestHandler(MainAction.NAME, ThreadPool.Names.SAME, reader , (request, channel) + -> channel.sendResponse(new MainResponse("somenode", version, clusterName, clusterName.value(), Build.CURRENT))); newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new, (request, channel) -> { DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); @@ -142,6 +158,30 @@ public static MockTransportService startTransport( } } + public void testWontConnectUnformedCluster() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool, + Settings.builder().put("cluster.name", "_na_").build()); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test", + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { + IllegalStateException ise = expectThrows(IllegalStateException.class, + () -> updateSeedNodes(connection, Arrays.asList(seedNode))); + assertEquals("seed nodes have not joined a cluster yet", ise.getMessage()); + } + } + } + } + public void testLocalProfileIsUsedForLocalCluster() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); @@ -156,7 +196,7 @@ public void testLocalProfileIsUsedForLocalCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, "elasticsearch")) { updateSeedNodes(connection, Arrays.asList(seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -196,7 +236,7 @@ public void testRemoteProfileIsUsedForRemoteCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { updateSeedNodes(connection, Arrays.asList(seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -252,7 +292,7 @@ public void testDiscoverSingleNode() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { updateSeedNodes(connection, Arrays.asList(seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -281,7 +321,7 @@ public void testDiscoverSingleNodeWithIncompatibleSeed() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, clusterUUID())) { updateSeedNodes(connection, seedNodes); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -308,7 +348,7 @@ public void testNodeDisconnected() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { updateSeedNodes(connection, Arrays.asList(seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -357,7 +397,7 @@ public void testFilterDiscoveredNodes() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false, clusterUUID())) { updateSeedNodes(connection, Arrays.asList(seedNode)); if (rejectedNode.equals(seedNode)) { assertFalse(service.nodeConnected(seedNode)); @@ -396,7 +436,7 @@ public void testConnectWithIncompatibleTransports() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { expectThrows(Exception.class, () -> updateSeedNodes(connection, Arrays.asList(seedNode))); assertFalse(service.nodeConnected(seedNode)); assertTrue(connection.assertNoRunningConnections()); @@ -450,7 +490,7 @@ public Connection getConnection(DiscoveryNode node) { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { connection.addConnectedNode(seedNode); for (DiscoveryNode node : knownNodes) { final Transport.Connection transportConnection = connection.getConnection(node); @@ -493,7 +533,7 @@ public void run() { CountDownLatch listenerCalled = new CountDownLatch(1); AtomicReference exceptionReference = new AtomicReference<>(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { ActionListener listener = ActionListener.wrap(x -> { listenerCalled.countDown(); fail("expected exception"); @@ -528,7 +568,7 @@ public void testFetchShards() throws Exception { service.acceptIncomingRequests(); List nodes = Collections.singletonList(seedNode); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - nodes, service, Integer.MAX_VALUE, n -> true)) { + nodes, service, Integer.MAX_VALUE, n -> true, clusterUUID())) { if (randomBoolean()) { updateSeedNodes(connection, nodes); } @@ -564,7 +604,7 @@ public void testFetchShardsSkipUnavailable() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Collections.singletonList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { SearchRequest request = new SearchRequest("test-index"); ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") @@ -672,7 +712,7 @@ public void testTriggerUpdatesConcurrently() throws IOException, InterruptedExce service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, clusterUUID())) { int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads); @@ -749,7 +789,7 @@ public void testCloseWhileConcurrentlyConnecting() throws IOException, Interrupt service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, clusterUUID())) { int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); @@ -832,7 +872,7 @@ public void testGetConnectionInfo() throws Exception { service.acceptIncomingRequests(); int maxNumConnections = randomIntBetween(1, 5); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, maxNumConnections, n -> true)) { + seedNodes, service, maxNumConnections, n -> true, clusterUUID())) { // test no nodes connected RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); assertNotNull(remoteConnectionInfo); @@ -979,7 +1019,7 @@ public void testEnsureConnected() throws IOException, InterruptedException { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { assertFalse(service.nodeConnected(seedNode)); assertFalse(service.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); @@ -1028,7 +1068,7 @@ public void testCollectNodes() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { if (randomBoolean()) { updateSeedNodes(connection, Arrays.asList(seedNode)); } @@ -1076,7 +1116,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, clusterUUID())) { final int numGetThreads = randomIntBetween(4, 10); final Thread[] getThreads = new Thread[numGetThreads]; final int numModifyingThreads = randomIntBetween(4, 10); @@ -1166,7 +1206,7 @@ public void testClusterNameIsChecked() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { updateSeedNodes(connection, Arrays.asList(seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -1183,9 +1223,9 @@ public void testClusterNameIsChecked() throws Exception { assertTrue(connection.assertNoRunningConnections()); IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, () -> updateSeedNodes(connection, Arrays.asList(otherClusterTransport.getLocalDiscoNode()))); - assertThat(illegalStateException.getMessage(), + assertThat(illegalStateException.getMessage(), startsWith("handshake failed, mismatched cluster name [Cluster [otherCluster]]" + - " - {other_cluster_discoverable_node}")); + " - " + otherClusterTransport.getLocalDiscoNode())); } } } @@ -1239,7 +1279,7 @@ public boolean nodeConnected(DiscoveryNode node) { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(connectedNode), service, Integer.MAX_VALUE, n -> true)) { + Collections.singletonList(connectedNode), service, Integer.MAX_VALUE, n -> true, clusterUUID())) { connection.addConnectedNode(connectedNode); for (int i = 0; i < 10; i++) { //always a direct connection as the remote node is already connected diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 03d76b5a953c6..f6b8988752ca8 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.AbstractScopedSettings; import org.elasticsearch.common.settings.ClusterSettings; @@ -127,7 +128,7 @@ public void testBuiltRemoteClustersSeeds() throws Exception { } - public void testGroupClusterIndices() throws IOException { + public void testGroupClusterIndices() throws IOException, InterruptedException { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { @@ -137,8 +138,7 @@ public void testGroupClusterIndices() throws IOException { knownNodes.add(otherSeedTransport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); - try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, - null)) { + try (MockTransportService transportService = startTransport("test", Collections.emptyList(), Version.CURRENT, Settings.EMPTY)) { transportService.start(); transportService.acceptIncomingRequests(); Settings.Builder builder = Settings.builder(); @@ -146,7 +146,7 @@ public void testGroupClusterIndices() throws IOException { builder.putList("search.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializedRemoteClusterService(service); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); assertTrue(service.isRemoteClusterRegistered("cluster_2")); @@ -173,7 +173,7 @@ public void testGroupClusterIndices() throws IOException { } } - public void testIncrementallyAddClusters() throws IOException { + public void testIncrementallyAddClusters() throws IOException, InterruptedException { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { @@ -192,7 +192,7 @@ public void testIncrementallyAddClusters() throws IOException { builder.putList("search.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializedRemoteClusterService(service); assertFalse(service.isCrossClusterSearchEnabled()); service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().address())); assertTrue(service.isCrossClusterSearchEnabled()); @@ -234,11 +234,7 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { knownNodes.add(c2N2Node); Collections.shuffle(knownNodes, random()); - try (MockTransportService transportService = MockTransportService.createNewService( - settings, - Version.CURRENT, - threadPool, - null)) { + try (MockTransportService transportService = startTransport("test", Collections.emptyList(), Version.CURRENT, settings)) { transportService.start(); transportService.acceptIncomingRequests(); final Settings.Builder builder = Settings.builder(); @@ -249,7 +245,7 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializedRemoteClusterService(service); assertFalse(service.isCrossClusterSearchEnabled()); final InetSocketAddress c1N1Address = c1N1Node.getAddress().address(); @@ -306,11 +302,7 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { knownNodes.add(c2N2Node); Collections.shuffle(knownNodes, random()); - try (MockTransportService transportService = MockTransportService.createNewService( - settings, - Version.CURRENT, - threadPool, - null)) { + try (MockTransportService transportService = startTransport("test", Collections.emptyList(), Version.CURRENT, settings)) { transportService.start(); transportService.acceptIncomingRequests(); final Settings.Builder builder = Settings.builder(); @@ -318,7 +310,7 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { builder.putList("search.remote.cluster_2.seeds", c2N1Node.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializedRemoteClusterService(service); assertFalse(service.isCrossClusterSearchEnabled()); final InetSocketAddress c1N1Address = c1N1Node.getAddress().address(); @@ -353,7 +345,7 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { } private ActionListener connectionListener(final CountDownLatch latch) { - return ActionListener.wrap(x -> latch.countDown(), x -> fail()); + return ActionListener.wrap(x -> latch.countDown(), x -> {latch.countDown(); throw new AssertionError(x);}); } public void testCollectNodes() throws InterruptedException, IOException { @@ -380,11 +372,7 @@ public void testCollectNodes() throws InterruptedException, IOException { Collections.shuffle(knownNodes_c1, random()); Collections.shuffle(knownNodes_c2, random()); - try (MockTransportService transportService = MockTransportService.createNewService( - settings, - Version.CURRENT, - threadPool, - null)) { + try (MockTransportService transportService = startTransport("test", Collections.emptyList(), Version.CURRENT, settings)) { transportService.start(); transportService.acceptIncomingRequests(); final Settings.Builder builder = Settings.builder(); @@ -395,7 +383,7 @@ public void testCollectNodes() throws InterruptedException, IOException { try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializedRemoteClusterService(service); assertFalse(service.isCrossClusterSearchEnabled()); final InetSocketAddress c1N1Address = c1N1Node.getAddress().address(); @@ -534,12 +522,12 @@ public void testCollectSearchShards() throws Exception { Settings settings = builder.build(); try { - try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + try (MockTransportService service = startTransport("test", Collections.emptyList(), Version.CURRENT, Settings.EMPTY)) { service.start(); service.acceptIncomingRequests(); try (RemoteClusterService remoteClusterService = new RemoteClusterService(settings, service)) { assertFalse(remoteClusterService.isCrossClusterSearchEnabled()); - remoteClusterService.initializeRemoteClusters(); + initializedRemoteClusterService(remoteClusterService); assertTrue(remoteClusterService.isCrossClusterSearchEnabled()); { final CountDownLatch latch = new CountDownLatch(1); @@ -678,6 +666,13 @@ public void onNodeDisconnected(DiscoveryNode node) { } } + private void initializedRemoteClusterService(RemoteClusterService remoteClusterService) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + remoteClusterService.initializeRemoteClusters(randomBoolean() ? ClusterName.DEFAULT.value() : + randomRealisticUnicodeOfLengthBetween(1, 5), Settings.EMPTY, connectionListener(latch)); + latch.await(); + } + public void testRemoteClusterSkipIfDisconnectedSetting() { { Settings settings = Settings.builder() diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 8831c46c01136..02ae1c8307b06 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -172,7 +172,6 @@ protected MockChannel initiateChannel(InetSocketAddress address, ActionListener< if (success == false) { IOUtils.close(socket); } - } executor.submit(() -> {