Skip to content

Commit

Permalink
Fix remote cluster seeds fallback (#34090)
Browse files Browse the repository at this point in the history
Recently we introduced the settings cluster.remote to take the place of
search.remote for configuring remote cluster connections. We made this
change due to the fact that we have generalized the remote cluster
infrastructure to also be used within cross-cluster replication and not
only cross-cluster search. For backwards compatibility, when we made this
change, we allowed that cluster.remote would fallback to
search.remote. Alas, the initial change for this contained a bug for
handling the proxy and seeds settings. The bug for the seeds settings
arose because we were manually iterating over the concrete settings only
for cluster.remote seeds but not for search.remote seeds. This commit
addresses this by iterating over both cluster.remote seeds and
search.remote seeds. Additionally, when checking for existence of proxy
settings, we have to not only check cluster.remote proxy settings, but
also fallback to search.remote proxy settings. This commit addresses
both issues, and adds tests for these situations.
  • Loading branch information
jasontedor authored and kcm committed Oct 30, 2018
1 parent b80763b commit bb1ca00
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,16 @@
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -181,12 +185,36 @@ protected RemoteClusterAware(Settings settings) {
* {@link TransportAddress#META_ADDRESS} and their configured address will be used as the hostname for the generated discovery node.
*/
protected static Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> buildRemoteClustersDynamicConfig(Settings settings) {
Stream<Setting<List<String>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
final Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> remoteSeeds =
buildRemoteClustersDynamicConfig(settings, REMOTE_CLUSTERS_SEEDS);
final Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> searchRemoteSeeds =
buildRemoteClustersDynamicConfig(settings, SEARCH_REMOTE_CLUSTERS_SEEDS);
// sort the intersection for predictable output order
final NavigableSet<String> intersection =
new TreeSet<>(Arrays.asList(
searchRemoteSeeds.keySet().stream().filter(s -> remoteSeeds.keySet().contains(s)).sorted().toArray(String[]::new)));
if (intersection.isEmpty() == false) {
final String message = String.format(
Locale.ROOT,
"found duplicate remote cluster configurations for cluster alias%s [%s]",
intersection.size() == 1 ? "" : "es",
String.join(",", intersection));
throw new IllegalArgumentException(message);
}
return Stream
.concat(remoteSeeds.entrySet().stream(), searchRemoteSeeds.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private static Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> buildRemoteClustersDynamicConfig(
final Settings settings, final Setting.AffixSetting<List<String>> seedsSetting) {
final Stream<Setting<List<String>>> allConcreteSettings = seedsSetting.getAllConcreteSettings(settings);
return allConcreteSettings.collect(
Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> {
String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting);
Collectors.toMap(seedsSetting::getNamespace, concreteSetting -> {
String clusterName = seedsSetting.getNamespace(concreteSetting);
List<String> addresses = concreteSetting.get(settings);
final boolean proxyMode = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).exists(settings);
final boolean proxyMode =
REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).existsOrFallbackExists(settings);
List<Supplier<DiscoveryNode>> nodes = new ArrayList<>(addresses.size());
for (String address : addresses) {
nodes.add(() -> buildSeedNode(clusterName, address, proxyMode));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.startsWith;

public class RemoteClusterServiceTests extends ESTestCase {
Expand Down Expand Up @@ -120,17 +123,19 @@ public void testRemoteClusterSeedSetting() {

public void testBuildRemoteClustersDynamicConfig() throws Exception {
Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> map = RemoteClusterService.buildRemoteClustersDynamicConfig(
Settings.builder().put("cluster.remote.foo.seeds", "192.168.0.1:8080")
.put("cluster.remote.bar.seeds", "[::1]:9090")
.put("cluster.remote.boom.seeds", "boom-node1.internal:1000")
.put("cluster.remote.boom.proxy", "foo.bar.com:1234").build());
assertEquals(3, map.size());
assertTrue(map.containsKey("foo"));
assertTrue(map.containsKey("bar"));
assertTrue(map.containsKey("boom"));
assertEquals(1, map.get("foo").v2().size());
assertEquals(1, map.get("bar").v2().size());
assertEquals(1, map.get("boom").v2().size());
Settings.builder()
.put("cluster.remote.foo.seeds", "192.168.0.1:8080")
.put("cluster.remote.bar.seeds", "[::1]:9090")
.put("cluster.remote.boom.seeds", "boom-node1.internal:1000")
.put("cluster.remote.boom.proxy", "foo.bar.com:1234")
.put("search.remote.quux.seeds", "quux:9300")
.put("search.remote.quux.proxy", "quux-proxy:19300")
.build());
assertThat(map.keySet(), containsInAnyOrder(equalTo("foo"), equalTo("bar"), equalTo("boom"), equalTo("quux")));
assertThat(map.get("foo").v2(), hasSize(1));
assertThat(map.get("bar").v2(), hasSize(1));
assertThat(map.get("boom").v2(), hasSize(1));
assertThat(map.get("quux").v2(), hasSize(1));

DiscoveryNode foo = map.get("foo").v2().get(0).get();
assertEquals("", map.get("foo").v1());
Expand All @@ -150,6 +155,41 @@ public void testBuildRemoteClustersDynamicConfig() throws Exception {
assertEquals(boom.getId(), "boom#boom-node1.internal:1000");
assertEquals("foo.bar.com:1234", map.get("boom").v1());
assertEquals(boom.getVersion(), Version.CURRENT.minimumCompatibilityVersion());

DiscoveryNode quux = map.get("quux").v2().get(0).get();
assertEquals(quux.getAddress(), new TransportAddress(TransportAddress.META_ADDRESS, 0));
assertEquals("quux", quux.getHostName());
assertEquals(quux.getId(), "quux#quux:9300");
assertEquals("quux-proxy:19300", map.get("quux").v1());
assertEquals(quux.getVersion(), Version.CURRENT.minimumCompatibilityVersion());

assertSettingDeprecationsAndWarnings(new String[]{"search.remote.quux.seeds", "search.remote.quux.proxy"});
}

public void testBuildRemoteClustersDynamicConfigWithDuplicate() {
final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> RemoteClusterService.buildRemoteClustersDynamicConfig(
Settings.builder()
.put("cluster.remote.foo.seeds", "192.168.0.1:8080")
.put("search.remote.foo.seeds", "192.168.0.1:8080")
.build()));
assertThat(e, hasToString(containsString("found duplicate remote cluster configurations for cluster alias [foo]")));
assertSettingDeprecationsAndWarnings(new String[]{"search.remote.foo.seeds"});
}

public void testBuildRemoteClustersDynamicConfigWithDuplicates() {
final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> RemoteClusterService.buildRemoteClustersDynamicConfig(
Settings.builder()
.put("cluster.remote.foo.seeds", "192.168.0.1:8080")
.put("search.remote.foo.seeds", "192.168.0.1:8080")
.put("cluster.remote.bar.seeds", "192.168.0.1:8080")
.put("search.remote.bar.seeds", "192.168.0.1:8080")
.build()));
assertThat(e, hasToString(containsString("found duplicate remote cluster configurations for cluster aliases [bar,foo]")));
assertSettingDeprecationsAndWarnings(new String[]{"search.remote.bar.seeds", "search.remote.foo.seeds"});
}

public void testGroupClusterIndices() throws IOException {
Expand Down

0 comments on commit bb1ca00

Please sign in to comment.