Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETWORKING: Make RemoteClusterConn. Lazy Resolve DNS #32764

Merged
merged 16 commits into from
Aug 18, 2018
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;

import java.util.function.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -48,9 +49,19 @@ public abstract class RemoteClusterAware extends AbstractComponent {
/**
* A list of initial seed nodes to discover eligible nodes from the remote cluster
*/
public static final Setting.AffixSetting<List<InetSocketAddress>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting("search.remote.",
"seeds", (key) -> Setting.listSetting(key, Collections.emptyList(), RemoteClusterAware::parseSeedAddress,
Setting.Property.NodeScope, Setting.Property.Dynamic));
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting(
"search.remote.",
"seeds",
key -> Setting.listSetting(
key, Collections.emptyList(),
s -> {
// validate seed address
RemoteClusterAware.parseSeedAddress(s);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this seems problematic. It means, for example, if a host fails to resolve, we mark the setting as invalid. Yet, it could resolve for us later?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jasontedor yea, I guess we could leave it out. I mainly just added this because there was an explicit test for this behavior.
I guess we could just delete it and drop the validation. I could go either way here:
Yes this could resolve later, so maybe it's fine not to validate. On the other hand, you could make an annoying typo in a hostname and now you won't catch it right away. Probably a question of how often and if someone would ever configure a hostname that doesn't resolve yet. Also, if you don't have the DNS caching ttl configured, not having this check is kind of annoying because it will never resolve and you might rather want this to fail right away
=> You decide :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a hostname fails to resolve at time of use (i.e., when we try to connect to the remote cluster), we would have a failure message in the logs indicating the problem? That is, the unknown host exception would be there?

I lean towards saying we leave the lookup out of validation, yet I think the remainder of the validation is good.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jasontedor sounds good will adjust accordingly in a bit :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jasontedor done in a22994a, extracted the port parsing from the address parsing to get all validation except for the hostname lookup into a separate method :)

return s;
},
Setting.Property.NodeScope, Setting.Property.Dynamic
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These could probably be on a separate line each, too. 😇

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure will do :)

)
);
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
public static final String LOCAL_CLUSTER_GROUP_KEY = "";

Expand All @@ -65,18 +76,19 @@ protected RemoteClusterAware(Settings settings) {
this.clusterNameResolver = new ClusterNameExpressionResolver(settings);
}

protected static Map<String, List<DiscoveryNode>> buildRemoteClustersSeeds(Settings settings) {
Stream<Setting<List<InetSocketAddress>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
protected static Map<String, List<Supplier<DiscoveryNode>>> buildRemoteClustersSeeds(Settings settings) {
Stream<Setting<List<String>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
return allConcreteSettings.collect(
Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> {
String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting);
List<DiscoveryNode> nodes = new ArrayList<>();
for (InetSocketAddress address : concreteSetting.get(settings)) {
TransportAddress transportAddress = new TransportAddress(address);
DiscoveryNode node = new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
nodes.add(node);
List<Supplier<DiscoveryNode>> nodes = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can pre-size the list?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure will do :)

for (String address : concreteSetting.get(settings)) {
nodes.add(() -> {
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
return new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
});
}
return nodes;
}));
Expand Down Expand Up @@ -128,7 +140,7 @@ public Map<String, List<String>> groupClusterIndices(String[] requestIndices, Pr
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is
* empty the cluster alias is unregistered and should be removed.
*/
protected abstract void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses);
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses);

/**
* Registers this instance to listen to updates on the cluster settings.
Expand All @@ -138,7 +150,7 @@ public void listenForUpdates(ClusterSettings clusterSettings) {
(namespace, value) -> {});
}

private static InetSocketAddress parseSeedAddress(String remoteHost) {
protected static InetSocketAddress parseSeedAddress(String remoteHost) {
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;

import java.util.function.Supplier;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.SetOnce;
Expand Down Expand Up @@ -84,7 +85,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
private final String clusterAlias;
private final int maxNumRemoteConnections;
private final Predicate<DiscoveryNode> nodePredicate;
private volatile List<DiscoveryNode> seedNodes;
private volatile List<Supplier<DiscoveryNode>> seedNodes;
private volatile boolean skipUnavailable;
private final ConnectHandler connectHandler;
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
Expand All @@ -99,7 +100,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
*/
RemoteClusterConnection(Settings settings, String clusterAlias, List<DiscoveryNode> seedNodes,
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
super(settings);
this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
Expand Down Expand Up @@ -127,7 +128,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
/**
* Updates the list of seed nodes for this cluster connection
*/
synchronized void updateSeedNodes(List<DiscoveryNode> seedNodes, ActionListener<Void> connectListener) {
synchronized void updateSeedNodes(List<Supplier<DiscoveryNode>> seedNodes, ActionListener<Void> connectListener) {
this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes));
connectHandler.connect(connectListener);
}
Expand Down Expand Up @@ -456,15 +457,15 @@ protected void doRun() {
});
}

void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
final TransportService transportService, ActionListener<Void> listener) {
if (Thread.currentThread().isInterrupted()) {
listener.onFailure(new InterruptedException("remote connect thread got interrupted"));
}
try {
if (seedNodes.hasNext()) {
cancellableThreads.executeIO(() -> {
final DiscoveryNode seedNode = seedNodes.next();
final DiscoveryNode seedNode = seedNodes.next().get();
final TransportService.HandshakeResponse handshakeResponse;
Transport.Connection connection = transportService.openConnection(seedNode,
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
Expand Down Expand Up @@ -554,11 +555,11 @@ private class SniffClusterStateResponseHandler implements TransportResponseHandl
private final TransportService transportService;
private final Transport.Connection connection;
private final ActionListener<Void> listener;
private final Iterator<DiscoveryNode> seedNodes;
private final Iterator<Supplier<DiscoveryNode>> seedNodes;
private final CancellableThreads cancellableThreads;

SniffClusterStateResponseHandler(TransportService transportService, Transport.Connection connection,
ActionListener<Void> listener, Iterator<DiscoveryNode> seedNodes,
ActionListener<Void> listener, Iterator<Supplier<DiscoveryNode>> seedNodes,
CancellableThreads cancellableThreads) {
this.transportService = transportService;
this.connection = connection;
Expand Down Expand Up @@ -651,7 +652,7 @@ void addConnectedNode(DiscoveryNode node) {
* Get the information about remote nodes to be rendered on {@code _remote/info} requests.
*/
public RemoteConnectionInfo getConnectionInfo() {
List<TransportAddress> seedNodeAddresses = seedNodes.stream().map(DiscoveryNode::getAddress).collect(Collectors.toList());
List<TransportAddress> seedNodeAddresses = seedNodes.stream().map(node -> node.get().getAddress()).collect(Collectors.toList());
TimeValue initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
return new RemoteConnectionInfo(clusterAlias, seedNodeAddresses, maxNumRemoteConnections, connectedNodes.size(),
initialConnectionTimeout, skipUnavailable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;

import java.util.function.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
Expand All @@ -40,7 +41,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -115,7 +115,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
* @param seeds a cluster alias to discovery node mapping representing the remote clusters seeds nodes
* @param connectionListener a listener invoked once every configured cluster has been connected to
*/
private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>> seeds, ActionListener<Void> connectionListener) {
private synchronized void updateRemoteClusters(Map<String, List<Supplier<DiscoveryNode>>> seeds,
ActionListener<Void> connectionListener) {
if (seeds.containsKey(LOCAL_CLUSTER_GROUP_KEY)) {
throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
}
Expand All @@ -125,7 +126,7 @@ private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>>
} else {
CountDown countDown = new CountDown(seeds.size());
remoteClusters.putAll(this.remoteClusters);
for (Map.Entry<String, List<DiscoveryNode>> entry : seeds.entrySet()) {
for (Map.Entry<String, List<Supplier<DiscoveryNode>>> entry : seeds.entrySet()) {
RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey());
if (entry.getValue().isEmpty()) { // with no seed nodes we just remove the connection
try {
Expand Down Expand Up @@ -310,16 +311,17 @@ synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavail
}
}

protected void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses) {
@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses) {
updateRemoteCluster(clusterAlias, addresses, ActionListener.wrap((x) -> {}, (x) -> {}));
}

void updateRemoteCluster(
final String clusterAlias,
final List<InetSocketAddress> addresses,
final List<String> addresses,
final ActionListener<Void> connectionListener) {
final List<DiscoveryNode> nodes = addresses.stream().map(address -> {
final TransportAddress transportAddress = new TransportAddress(address);
final List<Supplier<DiscoveryNode>> nodes = addresses.stream().<Supplier<DiscoveryNode>>map(address -> () -> {
final TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
final String id = clusterAlias + "#" + transportAddress.toString();
final Version version = Version.CURRENT.minimumCompatibilityVersion();
return new DiscoveryNode(id, transportAddress, version);
Expand All @@ -334,7 +336,7 @@ void updateRemoteCluster(
void initializeRemoteClusters() {
final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
final PlainActionFuture<Void> future = new PlainActionFuture<>();
Map<String, List<DiscoveryNode>> seeds = RemoteClusterAware.buildRemoteClustersSeeds(settings);
Map<String, List<Supplier<DiscoveryNode>>> seeds = RemoteClusterAware.buildRemoteClustersSeeds(settings);
updateRemoteClusters(seeds, future);
try {
future.get(timeValue.millis(), TimeUnit.MILLISECONDS);
Expand Down
Loading