Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add proxy support to RemoteClusterConnection #33062

Merged
merged 4 commits into from
Aug 25, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,10 @@ public static Setting<String> simpleString(String key, Property... properties) {
return new Setting<>(key, s -> "", Function.identity(), properties);
}

public static Setting<String> simpleString(String key, Function<String, String> parser, Property... properties) {
return new Setting<>(key, s -> "", parser, properties);
}

public static Setting<String> simpleString(String key, Setting<String> fallback, Property... properties) {
return new Setting<>(key, fallback, Function.identity(), properties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
*/
package org.elasticsearch.transport;

import java.util.EnumSet;
import java.util.function.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
Expand Down Expand Up @@ -66,6 +70,22 @@ public abstract class RemoteClusterAware extends AbstractComponent {
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
public static final String LOCAL_CLUSTER_GROUP_KEY = "";

/**
* A proxy address for the remote cluster.
* NOTE: this settings is undocumented until we have at last one transport that supports passing
* on the hostname via a mechanism like SNI.
*/
public static final Setting.AffixSetting<String> REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting(
"search.remote.",
"proxy",
key -> Setting.simpleString(key, s -> {
if (Strings.hasLength(s)) {
parsePort(s);
}
return s;
}, Setting.Property.NodeScope, Setting.Property.Dynamic), REMOTE_CLUSTERS_SEEDS);


protected final ClusterNameExpressionResolver clusterNameResolver;

/**
Expand All @@ -77,25 +97,37 @@ protected RemoteClusterAware(Settings settings) {
this.clusterNameResolver = new ClusterNameExpressionResolver(settings);
}

protected static Map<String, List<Supplier<DiscoveryNode>>> buildRemoteClustersSeeds(Settings settings) {
protected static Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> buildRemoteClustersSeeds(Settings settings) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a doc comment here? With the new work, it is really tough to figure out what is going on in this method.

Stream<Setting<List<String>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
return allConcreteSettings.collect(
Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> {
String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting);
List<String> addresses = concreteSetting.get(settings);
final boolean proxyMode = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).exists(settings);
List<Supplier<DiscoveryNode>> nodes = new ArrayList<>(addresses.size());
for (String address : addresses) {
nodes.add(() -> {
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
return new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
});
nodes.add(() -> buildSeedNode(clusterName, address, proxyMode));
}
return nodes;
return new Tuple<>(REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).get(settings), nodes);
}));
}

public static DiscoveryNode buildSeedNode(String clusterName, String address, boolean proxyMode) {
if (proxyMode) {
TransportAddress transportAddress = new TransportAddress(TransportAddress.META_ADDRESS, 0);
String hostName = address.substring(0, indexOfPortSeparator(address));
return new DiscoveryNode("", clusterName + "#" + address, UUIDs.randomBase64UUID(), hostName, address,
transportAddress, Collections
.emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class),
Version.CURRENT.minimumCompatibilityVersion());
} else {
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
return new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
}
}

/**
* Groups indices per cluster by splitting remote cluster-alias, index-name pairs on {@link #REMOTE_CLUSTER_INDEX_SEPARATOR}. All
* indices per cluster are collected as a list in the returned map keyed by the cluster alias. Local indices are grouped under
Expand Down Expand Up @@ -138,20 +170,24 @@ public Map<String, List<String>> groupClusterIndices(String[] requestIndices, Pr

protected abstract Set<String> getRemoteClusterNames();


/**
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is
* empty the cluster alias is unregistered and should be removed.
*/
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses);
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy);

/**
* Registers this instance to listen to updates on the cluster settings.
*/
public void listenForUpdates(ClusterSettings clusterSettings) {
clusterSettings.addAffixUpdateConsumer(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, this::updateRemoteCluster,
clusterSettings.addAffixUpdateConsumer(RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
(key, value) -> updateRemoteCluster(key, value.v2(), value.v1()),
(namespace, value) -> {});
}


protected static InetSocketAddress parseSeedAddress(String remoteHost) {
String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost));
InetAddress hostAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;

import java.net.InetSocketAddress;
import java.util.function.Supplier;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
Expand Down Expand Up @@ -88,6 +89,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
private final int maxNumRemoteConnections;
private final Predicate<DiscoveryNode> nodePredicate;
private final ThreadPool threadPool;
private volatile String proxyAddress;
private volatile List<Supplier<DiscoveryNode>> seedNodes;
private volatile boolean skipUnavailable;
private final ConnectHandler connectHandler;
Expand All @@ -106,6 +108,13 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections,
Predicate<DiscoveryNode> nodePredicate) {
this(settings, clusterAlias, seedNodes, transportService, connectionManager, maxNumRemoteConnections, nodePredicate, null);
}

RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, Predicate<DiscoveryNode>
nodePredicate,
String proxyAddress) {
super(settings);
this.transportService = transportService;
this.maxNumRemoteConnections = maxNumRemoteConnections;
Expand All @@ -130,13 +139,26 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
connectionManager.addListener(this);
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
connectionManager.addListener(transportService);
this.proxyAddress = proxyAddress;
}

private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, DiscoveryNode node) {
if (proxyAddress == null || proxyAddress.isEmpty()) {
return node;
} else {
// resovle proxy address lazy here
InetSocketAddress proxyInetAddress = RemoteClusterAware.parseSeedAddress(proxyAddress);
return new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), node
.getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion());
}
}

/**
* Updates the list of seed nodes for this cluster connection
*/
synchronized void updateSeedNodes(List<Supplier<DiscoveryNode>> seedNodes, ActionListener<Void> connectListener) {
synchronized void updateSeedNodes(String proxyAddress, List<Supplier<DiscoveryNode>> seedNodes, ActionListener<Void> connectListener) {
this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes));
this.proxyAddress = proxyAddress;
connectHandler.connect(connectListener);
}

Expand Down Expand Up @@ -281,6 +303,7 @@ Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
return new ProxyConnection(connection, remoteClusterNode);
}


static final class ProxyConnection implements Transport.Connection {
private final Transport.Connection proxyConnection;
private final DiscoveryNode targetNode;
Expand Down Expand Up @@ -461,7 +484,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
try {
if (seedNodes.hasNext()) {
cancellableThreads.executeIO(() -> {
final DiscoveryNode seedNode = seedNodes.next().get();
final DiscoveryNode seedNode = maybeAddProxyAddress(proxyAddress, seedNodes.next().get());
final TransportService.HandshakeResponse handshakeResponse;
Transport.Connection connection = manager.openConnection(seedNode,
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
Expand All @@ -476,7 +499,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
throw ex;
}

final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode();
final DiscoveryNode handshakeNode = maybeAddProxyAddress(proxyAddress, handshakeResponse.getDiscoveryNode());
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
manager.connectToNode(handshakeNode, remoteProfile, transportService.connectionValidator(handshakeNode));
if (remoteClusterName.get() == null) {
Expand Down Expand Up @@ -583,7 +606,8 @@ public void handleResponse(ClusterStateResponse response) {
cancellableThreads.executeIO(() -> {
DiscoveryNodes nodes = response.getState().nodes();
Iterable<DiscoveryNode> nodesIter = nodes.getNodes()::valuesIt;
for (DiscoveryNode node : nodesIter) {
for (DiscoveryNode n : nodesIter) {
DiscoveryNode node = maybeAddProxyAddress(proxyAddress, n);
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
try {
connectionManager.connectToNode(node, remoteProfile,
Expand Down Expand Up @@ -646,7 +670,8 @@ void addConnectedNode(DiscoveryNode node) {
* Get the information about remote nodes to be rendered on {@code _remote/info} requests.
*/
public RemoteConnectionInfo getConnectionInfo() {
List<TransportAddress> seedNodeAddresses = seedNodes.stream().map(node -> node.get().getAddress()).collect(Collectors.toList());
List<TransportAddress> seedNodeAddresses = seedNodes.stream().map(node -> node.get().getAddress()).collect
(Collectors.toList());
TimeValue initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
return new RemoteConnectionInfo(clusterAlias, seedNodeAddresses, maxNumRemoteConnections, connectedNodes.size(),
initialConnectionTimeout, skipUnavailable);
Expand Down
Loading