diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index bf53a3dc01a7a..237fc911db627 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -272,6 +272,7 @@ public void apply(Settings value, Settings current, Settings previous) { ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, + RemoteClusterAware.REMOTE_CLUSTERS_PROXY, RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE, RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER, RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/elasticsearch/common/settings/Setting.java b/server/src/main/java/org/elasticsearch/common/settings/Setting.java index b98c2753d701b..7b432c0ed1e18 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/server/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -1009,6 +1009,10 @@ public static Setting simpleString(String key, Property... properties) { return new Setting<>(key, s -> "", Function.identity(), properties); } + public static Setting simpleString(String key, Function parser, Property... properties) { + return new Setting<>(key, s -> "", parser, properties); + } + public static Setting simpleString(String key, Setting fallback, Property... properties) { return new Setting<>(key, fallback, Function.identity(), properties); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index a12f27c93e3c4..16d3c292bfe32 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -18,10 +18,14 @@ */ package org.elasticsearch.transport; +import java.util.EnumSet; import java.util.function.Supplier; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -66,6 +70,22 @@ public abstract class RemoteClusterAware extends AbstractComponent { public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':'; public static final String LOCAL_CLUSTER_GROUP_KEY = ""; + /** + * A proxy address for the remote cluster. + * NOTE: this settings is undocumented until we have at last one transport that supports passing + * on the hostname via a mechanism like SNI. + */ + public static final Setting.AffixSetting REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting( + "search.remote.", + "proxy", + key -> Setting.simpleString(key, s -> { + if (Strings.hasLength(s)) { + parsePort(s); + } + return s; + }, Setting.Property.NodeScope, Setting.Property.Dynamic), REMOTE_CLUSTERS_SEEDS); + + protected final ClusterNameExpressionResolver clusterNameResolver; /** @@ -77,25 +97,42 @@ protected RemoteClusterAware(Settings settings) { this.clusterNameResolver = new ClusterNameExpressionResolver(settings); } - protected static Map>> buildRemoteClustersSeeds(Settings settings) { + /** + * Builds the dynamic per-cluster config from the given settings. This is a map keyed by the cluster alias that points to a tuple + * (ProxyAddresss, [SeedNodeSuppliers]). If a cluster is configured with a proxy address all seed nodes will point to + * {@link TransportAddress#META_ADDRESS} and their configured address will be used as the hostname for the generated discovery node. + */ + protected static Map>>> buildRemoteClustersDynamicConfig(Settings settings) { Stream>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings); return allConcreteSettings.collect( Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> { String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting); List addresses = concreteSetting.get(settings); + final boolean proxyMode = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).exists(settings); List> nodes = new ArrayList<>(addresses.size()); for (String address : addresses) { - nodes.add(() -> { - TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address)); - return new DiscoveryNode(clusterName + "#" + transportAddress.toString(), - transportAddress, - Version.CURRENT.minimumCompatibilityVersion()); - }); + nodes.add(() -> buildSeedNode(clusterName, address, proxyMode)); } - return nodes; + return new Tuple<>(REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).get(settings), nodes); })); } + static DiscoveryNode buildSeedNode(String clusterName, String address, boolean proxyMode) { + if (proxyMode) { + TransportAddress transportAddress = new TransportAddress(TransportAddress.META_ADDRESS, 0); + String hostName = address.substring(0, indexOfPortSeparator(address)); + return new DiscoveryNode("", clusterName + "#" + address, UUIDs.randomBase64UUID(), hostName, address, + transportAddress, Collections + .emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class), + Version.CURRENT.minimumCompatibilityVersion()); + } else { + TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address)); + return new DiscoveryNode(clusterName + "#" + transportAddress.toString(), + transportAddress, + Version.CURRENT.minimumCompatibilityVersion()); + } + } + /** * Groups indices per cluster by splitting remote cluster-alias, index-name pairs on {@link #REMOTE_CLUSTER_INDEX_SEPARATOR}. All * indices per cluster are collected as a list in the returned map keyed by the cluster alias. Local indices are grouped under @@ -138,20 +175,24 @@ public Map> groupClusterIndices(String[] requestIndices, Pr protected abstract Set getRemoteClusterNames(); + /** * Subclasses must implement this to receive information about updated cluster aliases. If the given address list is * empty the cluster alias is unregistered and should be removed. */ - protected abstract void updateRemoteCluster(String clusterAlias, List addresses); + protected abstract void updateRemoteCluster(String clusterAlias, List addresses, String proxy); /** * Registers this instance to listen to updates on the cluster settings. */ public void listenForUpdates(ClusterSettings clusterSettings) { - clusterSettings.addAffixUpdateConsumer(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, this::updateRemoteCluster, + clusterSettings.addAffixUpdateConsumer(RemoteClusterAware.REMOTE_CLUSTERS_PROXY, + RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, + (key, value) -> updateRemoteCluster(key, value.v2(), value.v1()), (namespace, value) -> {}); } + protected static InetSocketAddress parseSeedAddress(String remoteHost) { String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost)); InetAddress hostAddress; diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 5621b38557814..6b1909434655f 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.transport; +import java.net.InetSocketAddress; import java.util.function.Supplier; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; @@ -88,6 +89,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo private final int maxNumRemoteConnections; private final Predicate nodePredicate; private final ThreadPool threadPool; + private volatile String proxyAddress; private volatile List> seedNodes; private volatile boolean skipUnavailable; private final ConnectHandler connectHandler; @@ -106,6 +108,13 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo RemoteClusterConnection(Settings settings, String clusterAlias, List> seedNodes, TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, Predicate nodePredicate) { + this(settings, clusterAlias, seedNodes, transportService, connectionManager, maxNumRemoteConnections, nodePredicate, null); + } + + RemoteClusterConnection(Settings settings, String clusterAlias, List> seedNodes, + TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, Predicate + nodePredicate, + String proxyAddress) { super(settings); this.transportService = transportService; this.maxNumRemoteConnections = maxNumRemoteConnections; @@ -130,13 +139,26 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo connectionManager.addListener(this); // we register the transport service here as a listener to make sure we notify handlers on disconnect etc. connectionManager.addListener(transportService); + this.proxyAddress = proxyAddress; + } + + private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, DiscoveryNode node) { + if (proxyAddress == null || proxyAddress.isEmpty()) { + return node; + } else { + // resovle proxy address lazy here + InetSocketAddress proxyInetAddress = RemoteClusterAware.parseSeedAddress(proxyAddress); + return new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), node + .getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion()); + } } /** * Updates the list of seed nodes for this cluster connection */ - synchronized void updateSeedNodes(List> seedNodes, ActionListener connectListener) { + synchronized void updateSeedNodes(String proxyAddress, List> seedNodes, ActionListener connectListener) { this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes)); + this.proxyAddress = proxyAddress; connectHandler.connect(connectListener); } @@ -281,6 +303,7 @@ Transport.Connection getConnection(DiscoveryNode remoteClusterNode) { return new ProxyConnection(connection, remoteClusterNode); } + static final class ProxyConnection implements Transport.Connection { private final Transport.Connection proxyConnection; private final DiscoveryNode targetNode; @@ -461,7 +484,7 @@ private void collectRemoteNodes(Iterator> seedNodes, try { if (seedNodes.hasNext()) { cancellableThreads.executeIO(() -> { - final DiscoveryNode seedNode = seedNodes.next().get(); + final DiscoveryNode seedNode = maybeAddProxyAddress(proxyAddress, seedNodes.next().get()); final TransportService.HandshakeResponse handshakeResponse; Transport.Connection connection = manager.openConnection(seedNode, ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null)); @@ -476,7 +499,7 @@ private void collectRemoteNodes(Iterator> seedNodes, throw ex; } - final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode(); + final DiscoveryNode handshakeNode = maybeAddProxyAddress(proxyAddress, handshakeResponse.getDiscoveryNode()); if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) { manager.connectToNode(handshakeNode, remoteProfile, transportService.connectionValidator(handshakeNode)); if (remoteClusterName.get() == null) { @@ -583,7 +606,8 @@ public void handleResponse(ClusterStateResponse response) { cancellableThreads.executeIO(() -> { DiscoveryNodes nodes = response.getState().nodes(); Iterable nodesIter = nodes.getNodes()::valuesIt; - for (DiscoveryNode node : nodesIter) { + for (DiscoveryNode n : nodesIter) { + DiscoveryNode node = maybeAddProxyAddress(proxyAddress, n); if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) { try { connectionManager.connectToNode(node, remoteProfile, @@ -646,7 +670,8 @@ void addConnectedNode(DiscoveryNode node) { * Get the information about remote nodes to be rendered on {@code _remote/info} requests. */ public RemoteConnectionInfo getConnectionInfo() { - List seedNodeAddresses = seedNodes.stream().map(node -> node.get().getAddress()).collect(Collectors.toList()); + List seedNodeAddresses = seedNodes.stream().map(node -> node.get().getAddress()).collect + (Collectors.toList()); TimeValue initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); return new RemoteConnectionInfo(clusterAlias, seedNodeAddresses, maxNumRemoteConnections, connectedNodes.size(), initialConnectionTimeout, skipUnavailable); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 34f13b672874f..60126847cbea9 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -31,10 +31,10 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.core.internal.io.IOUtils; @@ -116,8 +116,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl * @param seeds a cluster alias to discovery node mapping representing the remote clusters seeds nodes * @param connectionListener a listener invoked once every configured cluster has been connected to */ - private synchronized void updateRemoteClusters(Map>> seeds, - ActionListener connectionListener) { + private synchronized void updateRemoteClusters(Map>>> seeds, + ActionListener connectionListener) { if (seeds.containsKey(LOCAL_CLUSTER_GROUP_KEY)) { throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); } @@ -127,9 +127,12 @@ private synchronized void updateRemoteClusters(Map>> entry : seeds.entrySet()) { + for (Map.Entry>>> entry : seeds.entrySet()) { + List> seedList = entry.getValue().v2(); + String proxyAddress = entry.getValue().v1(); + RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey()); - if (entry.getValue().isEmpty()) { // with no seed nodes we just remove the connection + if (seedList.isEmpty()) { // with no seed nodes we just remove the connection try { IOUtils.close(remote); } catch (IOException e) { @@ -140,15 +143,15 @@ private synchronized void updateRemoteClusters(Map { if (countDown.countDown()) { connectionListener.onResponse(response); @@ -302,8 +305,7 @@ protected Set getRemoteClusterNames() { @Override public void listenForUpdates(ClusterSettings clusterSettings) { super.listenForUpdates(clusterSettings); - clusterSettings.addAffixUpdateConsumer(REMOTE_CLUSTER_SKIP_UNAVAILABLE, this::updateSkipUnavailable, - (clusterAlias, value) -> {}); + clusterSettings.addAffixUpdateConsumer(REMOTE_CLUSTER_SKIP_UNAVAILABLE, this::updateSkipUnavailable, (alias, value) -> {}); } synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) { @@ -313,22 +315,21 @@ synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavail } } + @Override - protected void updateRemoteCluster(String clusterAlias, List addresses) { - updateRemoteCluster(clusterAlias, addresses, ActionListener.wrap((x) -> {}, (x) -> {})); + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress) { + updateRemoteCluster(clusterAlias, addresses, proxyAddress, ActionListener.wrap((x) -> {}, (x) -> {})); } void updateRemoteCluster( final String clusterAlias, final List addresses, + final String proxyAddress, final ActionListener connectionListener) { - final List> nodes = addresses.stream().>map(address -> () -> { - final TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address)); - final String id = clusterAlias + "#" + transportAddress.toString(); - final Version version = Version.CURRENT.minimumCompatibilityVersion(); - return new DiscoveryNode(id, transportAddress, version); - }).collect(Collectors.toList()); - updateRemoteClusters(Collections.singletonMap(clusterAlias, nodes), connectionListener); + final List> nodes = addresses.stream().>map(address -> () -> + buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress)) + ).collect(Collectors.toList()); + updateRemoteClusters(Collections.singletonMap(clusterAlias, new Tuple<>(proxyAddress, nodes)), connectionListener); } /** @@ -338,7 +339,7 @@ void updateRemoteCluster( void initializeRemoteClusters() { final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); final PlainActionFuture future = new PlainActionFuture<>(); - Map>> seeds = RemoteClusterAware.buildRemoteClustersSeeds(settings); + Map>>> seeds = RemoteClusterAware.buildRemoteClustersDynamicConfig(settings); updateRemoteClusters(seeds, future); try { future.get(timeValue.millis(), TimeUnit.MILLISECONDS); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index e40486d63dc40..88b01c66898a0 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.transport; +import java.util.HashMap; +import java.util.Map; import java.util.function.Supplier; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; @@ -52,6 +54,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.test.transport.StubbableTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -378,15 +381,19 @@ public void testFilterDiscoveredNodes() throws Exception { } } } - private void updateSeedNodes(RemoteClusterConnection connection, List> seedNodes) throws Exception { + updateSeedNodes(connection, seedNodes, null); + } + + private void updateSeedNodes(RemoteClusterConnection connection, List> seedNodes, String proxyAddress) + throws Exception { CountDownLatch latch = new CountDownLatch(1); AtomicReference exceptionAtomicReference = new AtomicReference<>(); ActionListener listener = ActionListener.wrap(x -> latch.countDown(), x -> { exceptionAtomicReference.set(x); latch.countDown(); }); - connection.updateSeedNodes(seedNodes, listener); + connection.updateSeedNodes(proxyAddress, seedNodes, listener); latch.await(); if (exceptionAtomicReference.get() != null) { throw exceptionAtomicReference.get(); @@ -517,7 +524,7 @@ public void run() { exceptionReference.set(x); listenerCalled.countDown(); }); - connection.updateSeedNodes(Arrays.asList(() -> seedNode), listener); + connection.updateSeedNodes(null, Arrays.asList(() -> seedNode), listener); acceptedLatch.await(); connection.close(); // now close it, this should trigger an interrupt on the socket and we can move on assertTrue(connection.assertNoRunningConnections()); @@ -787,7 +794,7 @@ public void run() { throw new AssertionError(x); } }); - connection.updateSeedNodes(seedNodes, listener); + connection.updateSeedNodes(null, seedNodes, listener); } latch.await(); } catch (Exception ex) { @@ -875,7 +882,7 @@ public void run() { } }); try { - connection.updateSeedNodes(seedNodes, listener); + connection.updateSeedNodes(null, seedNodes, listener); } catch (Exception e) { // it's ok if we're shutting down assertThat(e.getMessage(), containsString("threadcontext is already closed")); @@ -1384,4 +1391,97 @@ public void testLazyResolveTransportAddress() throws Exception { } } } + + public void testProxyMode() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("node_0", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("node_1", knownNodes, Version.CURRENT)) { + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + final String proxyAddress = "1.1.1.1:99"; + Map nodes = new HashMap<>(); + nodes.put("node_0", seedTransport.getLocalDiscoNode()); + nodes.put("node_1", discoverableTransport.getLocalDiscoNode()); + Transport mockTcpTransport = getProxyTransport(threadPool, Collections.singletonMap(proxyAddress, nodes)); + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, mockTcpTransport, Version.CURRENT, + threadPool, null, Collections.emptySet())) { + service.start(); + service.acceptIncomingRequests(); + Supplier seedSupplier = () -> + RemoteClusterAware.buildSeedNode("some-remote-cluster", "node_0:" + randomIntBetween(1, 10000), true); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Arrays.asList(seedSupplier), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, proxyAddress)) { + updateSeedNodes(connection, Arrays.asList(seedSupplier), proxyAddress); + assertEquals(2, connection.getNumNodesConnected()); + assertNotNull(connection.getConnection(discoverableTransport.getLocalDiscoNode())); + assertNotNull(connection.getConnection(seedTransport.getLocalDiscoNode())); + assertEquals(proxyAddress, connection.getConnection(seedTransport.getLocalDiscoNode()) + .getNode().getAddress().toString()); + assertEquals(proxyAddress, connection.getConnection(discoverableTransport.getLocalDiscoNode()) + .getNode().getAddress().toString()); + service.getConnectionManager().disconnectFromNode(knownNodes.get(0)); + // ensure we reconnect + assertBusy(() -> { + assertEquals(2, connection.getNumNodesConnected()); + }); + discoverableTransport.close(); + seedTransport.close(); + } + } + } + } + + public static Transport getProxyTransport(ThreadPool threadPool, Map> nodeMap) { + if (nodeMap.isEmpty()) { + throw new IllegalArgumentException("nodeMap must be non-empty"); + } + + StubbableTransport stubbableTransport = new StubbableTransport(MockTransportService.newMockTransport(Settings.EMPTY, Version + .CURRENT, threadPool)); + stubbableTransport.setDefaultConnectBehavior((t, node, profile) -> { + Map proxyMapping = nodeMap.get(node.getAddress().toString()); + if (proxyMapping == null) { + throw new IllegalStateException("no proxy mapping for node: " + node); + } + DiscoveryNode proxyNode = proxyMapping.get(node.getName()); + if (proxyNode == null) { + // this is a seednode - lets pick one randomly + assertEquals("seed node must not have a port in the hostname: " + node.getHostName(), + -1, node.getHostName().lastIndexOf(':')); + assertTrue("missing hostname: " + node, proxyMapping.containsKey(node.getHostName())); + // route by seed hostname + proxyNode = proxyMapping.get(node.getHostName()); + } + Transport.Connection connection = t.openConnection(proxyNode, profile); + return new Transport.Connection() { + @Override + public DiscoveryNode getNode() { + return node; + } + + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws IOException, TransportException { + connection.sendRequest(requestId, action, request, options); + } + + @Override + public void addCloseListener(ActionListener listener) { + connection.addCloseListener(listener); + } + + @Override + public boolean isClosed() { + return connection.isClosed(); + } + + @Override + public void close() { + connection.close(); + } + }; + }); + return stubbableTransport; + } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 84a6ce54d1ed1..9d42b4e458dbe 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.AbstractScopedSettings; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -55,6 +56,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Predicate; +import java.util.stream.Collectors; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.instanceOf; @@ -115,25 +117,38 @@ public void testRemoteClusterSeedSetting() { assertEquals("failed to parse port", e.getMessage()); } - public void testBuiltRemoteClustersSeeds() throws Exception { - Map>> map = RemoteClusterService.buildRemoteClustersSeeds( - Settings.builder().put("search.remote.foo.seeds", "192.168.0.1:8080").put("search.remote.bar.seeds", "[::1]:9090").build()); - assertEquals(2, map.size()); + public void testBuildRemoteClustersDynamicConfig() throws Exception { + Map>>> map = RemoteClusterService.buildRemoteClustersDynamicConfig( + Settings.builder().put("search.remote.foo.seeds", "192.168.0.1:8080") + .put("search.remote.bar.seeds", "[::1]:9090") + .put("search.remote.boom.seeds", "boom-node1.internal:1000") + .put("search.remote.boom.proxy", "foo.bar.com:1234").build()); + assertEquals(3, map.size()); assertTrue(map.containsKey("foo")); assertTrue(map.containsKey("bar")); - assertEquals(1, map.get("foo").size()); - assertEquals(1, map.get("bar").size()); - - DiscoveryNode foo = map.get("foo").get(0).get(); + assertTrue(map.containsKey("boom")); + assertEquals(1, map.get("foo").v2().size()); + assertEquals(1, map.get("bar").v2().size()); + assertEquals(1, map.get("boom").v2().size()); + DiscoveryNode foo = map.get("foo").v2().get(0).get(); + assertEquals("", map.get("foo").v1()); assertEquals(foo.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("192.168.0.1"), 8080))); assertEquals(foo.getId(), "foo#192.168.0.1:8080"); assertEquals(foo.getVersion(), Version.CURRENT.minimumCompatibilityVersion()); - DiscoveryNode bar = map.get("bar").get(0).get(); + DiscoveryNode bar = map.get("bar").v2().get(0).get(); assertEquals(bar.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("[::1]"), 9090))); assertEquals(bar.getId(), "bar#[::1]:9090"); + assertEquals("", map.get("bar").v1()); assertEquals(bar.getVersion(), Version.CURRENT.minimumCompatibilityVersion()); + + DiscoveryNode boom = map.get("boom").v2().get(0).get(); + assertEquals(boom.getAddress(), new TransportAddress(TransportAddress.META_ADDRESS, 0)); + assertEquals("boom-node1.internal", boom.getHostName()); + assertEquals(boom.getId(), "boom#boom-node1.internal:1000"); + assertEquals("foo.bar.com:1234", map.get("boom").v1()); + assertEquals(boom.getVersion(), Version.CURRENT.minimumCompatibilityVersion()); } @@ -204,17 +219,17 @@ public void testIncrementallyAddClusters() throws IOException { assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); assertFalse(service.isCrossClusterSearchEnabled()); - service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString())); + service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()), null); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); - service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().toString())); + service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().toString()), null); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); assertTrue(service.isRemoteClusterRegistered("cluster_2")); - service.updateRemoteCluster("cluster_2", Collections.emptyList()); + service.updateRemoteCluster("cluster_2", Collections.emptyList(), null); assertFalse(service.isRemoteClusterRegistered("cluster_2")); IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, - () -> service.updateRemoteCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Collections.emptyList())); + () -> service.updateRemoteCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Collections.emptyList(), null)); assertEquals("remote clusters must not have the empty string as its key", iae.getMessage()); } } @@ -265,14 +280,14 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { final CountDownLatch firstLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_1", - Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), + Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null, connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", - Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), + Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null, connectionListener(secondLatch)); secondLatch.await(); @@ -330,14 +345,14 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { final CountDownLatch firstLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_1", - Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), + Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null, connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", - Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), + Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null, connectionListener(secondLatch)); secondLatch.await(); @@ -403,14 +418,14 @@ public void testCollectNodes() throws InterruptedException, IOException { final CountDownLatch firstLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_1", - Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), + Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null, connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", - Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), + Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null, connectionListener(secondLatch)); secondLatch.await(); CountDownLatch latch = new CountDownLatch(1); @@ -822,4 +837,76 @@ public void testGetNodePredicatesCombination() { assertTrue(nodePredicate.test(node)); } } + + public void testRemoteClusterWithProxy() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService cluster_1_node0 = startTransport("cluster_1_node0", knownNodes, Version.CURRENT); + MockTransportService cluster_1_node_1 = startTransport("cluster_1_node1", knownNodes, Version.CURRENT); + MockTransportService cluster_2_node0 = startTransport("cluster_2_node0", Collections.emptyList(), Version.CURRENT)) { + knownNodes.add(cluster_1_node0.getLocalDiscoNode()); + knownNodes.add(cluster_1_node_1.getLocalDiscoNode()); + String cluster1Proxy = "1.1.1.1:99"; + String cluster2Proxy = "2.2.2.2:99"; + Map nodesCluster1 = new HashMap<>(); + nodesCluster1.put("cluster_1_node0", cluster_1_node0.getLocalDiscoNode()); + nodesCluster1.put("cluster_1_node1", cluster_1_node_1.getLocalDiscoNode()); + Map> mapping = new HashMap<>(); + mapping.put(cluster1Proxy, nodesCluster1); + mapping.put(cluster2Proxy, Collections.singletonMap("cluster_2_node0", cluster_2_node0.getLocalDiscoNode())); + + Collections.shuffle(knownNodes, random()); + Transport proxyTransport = RemoteClusterConnectionTests.getProxyTransport(threadPool, mapping); + try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, proxyTransport, + Version.CURRENT, threadPool, null, Collections.emptySet());) { + transportService.start(); + transportService.acceptIncomingRequests(); + Settings.Builder builder = Settings.builder(); + builder.putList("search.remote.cluster_1.seeds", "cluster_1_node0:8080"); + builder.put("search.remote.cluster_1.proxy", cluster1Proxy); + try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertTrue(service.isCrossClusterSearchEnabled()); + updateRemoteCluster(service, "cluster_1", Collections.singletonList("cluster_1_node1:8081"), cluster1Proxy); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + assertFalse(service.isRemoteClusterRegistered("cluster_2")); + updateRemoteCluster(service, "cluster_2", Collections.singletonList("cluster_2_node0:9300"), cluster2Proxy); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + assertTrue(service.isRemoteClusterRegistered("cluster_2")); + List infos = service.getRemoteConnectionInfos().collect(Collectors.toList()); + for (RemoteConnectionInfo info : infos) { + switch (info.clusterAlias) { + case "cluster_1": + assertEquals(2, info.numNodesConnected); + break; + case "cluster_2": + assertEquals(1, info.numNodesConnected); + break; + default: + fail("unknown cluster: " + info.clusterAlias); + } + } + service.updateRemoteCluster("cluster_2", Collections.emptyList(), randomBoolean() ? cluster2Proxy : null); + assertFalse(service.isRemoteClusterRegistered("cluster_2")); + } + } + } + } + + private void updateRemoteCluster(RemoteClusterService service, String clusterAlias, List addresses, String proxyAddress) + throws Exception { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference exceptionAtomicReference = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap(x -> latch.countDown(), x -> { + exceptionAtomicReference.set(x); + latch.countDown(); + }); + service.updateRemoteCluster(clusterAlias, addresses, proxyAddress, listener); + latch.await(); + if (exceptionAtomicReference.get() != null) { + throw exceptionAtomicReference.get(); + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 15ab06d651e92..d6c4f30a885d5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -95,6 +95,12 @@ public List> getSettings() { public static MockTransportService createNewService(Settings settings, Version version, ThreadPool threadPool, @Nullable ClusterSettings clusterSettings) { + MockTcpTransport mockTcpTransport = newMockTransport(settings, version, threadPool); + return createNewService(settings, mockTcpTransport, version, threadPool, clusterSettings, + Collections.emptySet()); + } + + public static MockTcpTransport newMockTransport(Settings settings, Version version, ThreadPool threadPool) { // some tests use MockTransportService to do network based testing. Yet, we run tests in multiple JVMs that means // concurrent tests could claim port that another JVM just released and if that test tries to simulate a disconnect it might // be smart enough to re-connect depending on what is tested. To reduce the risk, since this is very hard to debug we use @@ -102,9 +108,8 @@ public static MockTransportService createNewService(Settings settings, Version v int basePort = 10300 + (JVM_ORDINAL * 100); // use a non-default port otherwise some cluster in this JVM might reuse a port settings = Settings.builder().put(TcpTransport.PORT.getKey(), basePort + "-" + (basePort + 100)).put(settings).build(); NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables()); - final Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, + return new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version); - return createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); } public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool, diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java index 2e78f8a9a4f04..d35fe609c0855 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java @@ -41,7 +41,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -public class StubbableTransport implements Transport { +public final class StubbableTransport implements Transport { private final ConcurrentHashMap sendBehaviors = new ConcurrentHashMap<>(); private final ConcurrentHashMap connectBehaviors = new ConcurrentHashMap<>(); @@ -60,6 +60,12 @@ boolean setDefaultSendBehavior(SendRequestBehavior sendBehavior) { return prior == null; } + public boolean setDefaultConnectBehavior(OpenConnectionBehavior openConnectionBehavior) { + OpenConnectionBehavior prior = this.defaultConnectBehavior; + this.defaultConnectBehavior = openConnectionBehavior; + return prior == null; + } + boolean addSendBehavior(TransportAddress transportAddress, SendRequestBehavior sendBehavior) { return sendBehaviors.put(transportAddress, sendBehavior) == null; } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index 3cf2034cc74b8..34aed55bb2903 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -418,7 +418,7 @@ private static class RemoteClusterResolver extends RemoteClusterAware { private RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings) { super(settings); - clusters = new CopyOnWriteArraySet<>(buildRemoteClustersSeeds(settings).keySet()); + clusters = new CopyOnWriteArraySet<>(buildRemoteClustersDynamicConfig(settings).keySet()); listenForUpdates(clusterSettings); } @@ -428,7 +428,7 @@ protected Set getRemoteClusterNames() { } @Override - protected void updateRemoteCluster(String clusterAlias, List addresses) { + protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress) { if (addresses.isEmpty()) { clusters.remove(clusterAlias); } else {