From e747326b0482667b4d10867adca694fc5b67202d Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 3 May 2019 11:15:24 +0200 Subject: [PATCH] Adapt low-level REST client to java 8 (#41537) As a follow-up to #38540 we can use lambda functions and method references where convenient in the low-level REST client. Also, we need to update the docs to state that the minimum java version required is 1.8. --- .../elasticsearch/client/DeadHostState.java | 37 +++------- .../org/elasticsearch/client/Request.java | 1 - .../elasticsearch/client/RequestOptions.java | 4 +- .../org/elasticsearch/client/RestClient.java | 38 +++-------- .../client/RestClientBuilder.java | 15 +---- .../client/DeadHostStateTests.java | 67 +++++-------------- .../elasticsearch/client/RestClientTests.java | 14 ++-- docs/java-rest/low-level/usage.asciidoc | 2 +- 8 files changed, 51 insertions(+), 127 deletions(-) diff --git a/client/rest/src/main/java/org/elasticsearch/client/DeadHostState.java b/client/rest/src/main/java/org/elasticsearch/client/DeadHostState.java index 2a62adb285ad6..bff397a1c2c81 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/DeadHostState.java +++ b/client/rest/src/main/java/org/elasticsearch/client/DeadHostState.java @@ -20,6 +20,7 @@ package org.elasticsearch.client; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; /** * Holds the state of a dead connection to a host. Keeps track of how many failed attempts were performed and @@ -30,10 +31,11 @@ final class DeadHostState implements Comparable { private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1); static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30); + static final Supplier DEFAULT_TIME_SUPPLIER = System::nanoTime; private final int failedAttempts; private final long deadUntilNanos; - private final TimeSupplier timeSupplier; + private final Supplier timeSupplier; /** * Build the initial dead state of a host. Useful when a working host stops functioning @@ -41,9 +43,9 @@ final class DeadHostState implements Comparable { * * @param timeSupplier a way to supply the current time and allow for unit testing */ - DeadHostState(TimeSupplier timeSupplier) { + DeadHostState(Supplier timeSupplier) { this.failedAttempts = 1; - this.deadUntilNanos = timeSupplier.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS; + this.deadUntilNanos = timeSupplier.get() + MIN_CONNECTION_TIMEOUT_NANOS; this.timeSupplier = timeSupplier; } @@ -51,14 +53,14 @@ final class DeadHostState implements Comparable { * Build the dead state of a host given its previous dead state. Useful when a host has been failing before, hence * it already failed for one or more consecutive times. The more failed attempts we register the longer we wait * to retry that same host again. Minimum is 1 minute (for a node the only failed once created - * through {@link #DeadHostState(TimeSupplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times) + * through {@link #DeadHostState(Supplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times) * * @param previousDeadHostState the previous state of the host which allows us to increase the wait till the next retry attempt */ DeadHostState(DeadHostState previousDeadHostState) { long timeoutNanos = (long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1), MAX_CONNECTION_TIMEOUT_NANOS); - this.deadUntilNanos = previousDeadHostState.timeSupplier.nanoTime() + timeoutNanos; + this.deadUntilNanos = previousDeadHostState.timeSupplier.get() + timeoutNanos; this.failedAttempts = previousDeadHostState.failedAttempts + 1; this.timeSupplier = previousDeadHostState.timeSupplier; } @@ -69,7 +71,7 @@ final class DeadHostState implements Comparable { * @return true if the host should be retried, false otherwise */ boolean shallBeRetried() { - return timeSupplier.nanoTime() - deadUntilNanos > 0; + return timeSupplier.get() - deadUntilNanos > 0; } /** @@ -87,8 +89,8 @@ int getFailedAttempts() { @Override public int compareTo(DeadHostState other) { if (timeSupplier != other.timeSupplier) { - throw new IllegalArgumentException("can't compare DeadHostStates with different clocks [" - + timeSupplier + " != " + other.timeSupplier + "]"); + throw new IllegalArgumentException("can't compare DeadHostStates holding different time suppliers as they may " + + "be based on different clocks"); } return Long.compare(deadUntilNanos, other.deadUntilNanos); } @@ -101,23 +103,4 @@ public String toString() { ", timeSupplier=" + timeSupplier + '}'; } - - /** - * Time supplier that makes timing aspects pluggable to ease testing - */ - interface TimeSupplier { - TimeSupplier DEFAULT = new TimeSupplier() { - @Override - public long nanoTime() { - return System.nanoTime(); - } - - @Override - public String toString() { - return "nanoTime"; - } - }; - - long nanoTime(); - } } diff --git a/client/rest/src/main/java/org/elasticsearch/client/Request.java b/client/rest/src/main/java/org/elasticsearch/client/Request.java index a6febe91ae8d0..2e4733201b12c 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/Request.java +++ b/client/rest/src/main/java/org/elasticsearch/client/Request.java @@ -74,7 +74,6 @@ public String getEndpoint() { */ public void addParameter(String name, String value) { Objects.requireNonNull(name, "url parameter name cannot be null"); - // .putIfAbsent(name, value) except we are in Java 7 which doesn't have that. if (parameters.containsKey(name)) { throw new IllegalArgumentException("url parameter [" + name + "] has already been set to [" + parameters.get(name) + "]"); } else { diff --git a/client/rest/src/main/java/org/elasticsearch/client/RequestOptions.java b/client/rest/src/main/java/org/elasticsearch/client/RequestOptions.java index f06c375d4302d..0b2cdce3d52f7 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RequestOptions.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RequestOptions.java @@ -19,8 +19,8 @@ package org.elasticsearch.client; -import org.apache.http.message.BasicHeader; import org.apache.http.Header; +import org.apache.http.message.BasicHeader; import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; import org.elasticsearch.client.HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory; @@ -38,7 +38,7 @@ public final class RequestOptions { * Default request options. */ public static final RequestOptions DEFAULT = new Builder( - Collections.
emptyList(), HeapBufferedResponseConsumerFactory.DEFAULT, null).build(); + Collections.emptyList(), HeapBufferedResponseConsumerFactory.DEFAULT, null).build(); private final List
headers; private final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory; diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java index f0478742ecc82..38185ac960926 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java @@ -46,7 +46,6 @@ import org.apache.http.nio.client.methods.HttpAsyncMethods; import org.apache.http.nio.protocol.HttpAsyncRequestProducer; import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; -import org.elasticsearch.client.DeadHostState.TimeSupplier; import javax.net.ssl.SSLHandshakeException; import java.io.Closeable; @@ -72,6 +71,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static java.util.Collections.singletonList; @@ -139,7 +139,11 @@ public static RestClientBuilder builder(Node... nodes) { * @see Node#Node(HttpHost) */ public static RestClientBuilder builder(HttpHost... hosts) { - return new RestClientBuilder(hostsToNodes(hosts)); + if (hosts == null || hosts.length == 0) { + throw new IllegalArgumentException("hosts must not be null nor empty"); + } + List nodes = Arrays.stream(hosts).map(Node::new).collect(Collectors.toList()); + return new RestClientBuilder(nodes); } /** @@ -163,17 +167,6 @@ public synchronized void setNodes(Collection nodes) { this.blacklist.clear(); } - private static List hostsToNodes(HttpHost[] hosts) { - if (hosts == null || hosts.length == 0) { - throw new IllegalArgumentException("hosts must not be null nor empty"); - } - List nodes = new ArrayList<>(hosts.length); - for (HttpHost host : hosts) { - nodes.add(new Node(host)); - } - return nodes; - } - /** * Get the list of nodes that the client knows about. The list is * unmodifiable. @@ -369,15 +362,11 @@ static Iterable selectNodes(NodeTuple> nodeTuple, Map deadNodes = new ArrayList<>(blacklist.size()); for (Node node : nodeTuple.nodes) { DeadHostState deadness = blacklist.get(node.getHost()); - if (deadness == null) { - livingNodes.add(node); - continue; - } - if (deadness.shallBeRetried()) { + if (deadness == null || deadness.shallBeRetried()) { livingNodes.add(node); - continue; + } else { + deadNodes.add(new DeadNode(node, deadness)); } - deadNodes.add(new DeadNode(node, deadness)); } if (false == livingNodes.isEmpty()) { @@ -415,12 +404,7 @@ static Iterable selectNodes(NodeTuple> nodeTuple, Map() { - @Override - public Iterator iterator() { - return new DeadNodeIteratorAdapter(selectedDeadNodes.iterator()); - } - }); + nodeSelector.select(() -> new DeadNodeIteratorAdapter(selectedDeadNodes.iterator())); if (false == selectedDeadNodes.isEmpty()) { return singletonList(Collections.min(selectedDeadNodes).node); } @@ -447,7 +431,7 @@ private void onResponse(Node node) { private void onFailure(Node node) { while(true) { DeadHostState previousDeadHostState = - blacklist.putIfAbsent(node.getHost(), new DeadHostState(TimeSupplier.DEFAULT)); + blacklist.putIfAbsent(node.getHost(), new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER)); if (previousDeadHostState == null) { if (logger.isDebugEnabled()) { logger.debug("added [" + node + "] to blacklist"); diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java b/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java index 2337cbf1fd029..3deb1c8f151f1 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java @@ -186,12 +186,8 @@ public RestClient build() { if (failureListener == null) { failureListener = new RestClient.FailureListener(); } - CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(new PrivilegedAction() { - @Override - public CloseableHttpAsyncClient run() { - return createHttpClient(); - } - }); + CloseableHttpAsyncClient httpClient = AccessController.doPrivileged( + (PrivilegedAction) this::createHttpClient); RestClient restClient = new RestClient(httpClient, defaultHeaders, nodes, pathPrefix, failureListener, nodeSelector, strictDeprecationMode); httpClient.start(); @@ -218,12 +214,7 @@ private CloseableHttpAsyncClient createHttpClient() { } final HttpAsyncClientBuilder finalBuilder = httpClientBuilder; - return AccessController.doPrivileged(new PrivilegedAction() { - @Override - public CloseableHttpAsyncClient run() { - return finalBuilder.build(); - } - }); + return AccessController.doPrivileged((PrivilegedAction) finalBuilder::build); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException("could not create the default ssl context", e); } diff --git a/client/rest/src/test/java/org/elasticsearch/client/DeadHostStateTests.java b/client/rest/src/test/java/org/elasticsearch/client/DeadHostStateTests.java index 1b140928cf15a..8ba6cdc5e97a8 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/DeadHostStateTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/DeadHostStateTests.java @@ -22,8 +22,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.elasticsearch.client.DeadHostState.TimeSupplier; - import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -38,14 +36,14 @@ public class DeadHostStateTests extends RestClientTestCase { private static long[] EXPECTED_TIMEOUTS_SECONDS = new long[]{60, 84, 120, 169, 240, 339, 480, 678, 960, 1357, 1800}; public void testInitialDeadHostStateDefaultTimeSupplier() { - DeadHostState deadHostState = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT); + DeadHostState deadHostState = new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER); long currentTime = System.nanoTime(); assertThat(deadHostState.getDeadUntilNanos(), greaterThanOrEqualTo(currentTime)); assertThat(deadHostState.getFailedAttempts(), equalTo(1)); } public void testDeadHostStateFromPreviousDefaultTimeSupplier() { - DeadHostState previous = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT); + DeadHostState previous = new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER); int iters = randomIntBetween(5, 30); for (int i = 0; i < iters; i++) { DeadHostState deadHostState = new DeadHostState(previous); @@ -58,10 +56,13 @@ public void testDeadHostStateFromPreviousDefaultTimeSupplier() { public void testCompareToTimeSupplier() { int numObjects = randomIntBetween(EXPECTED_TIMEOUTS_SECONDS.length, 30); DeadHostState[] deadHostStates = new DeadHostState[numObjects]; + final AtomicLong time = new AtomicLong(0); for (int i = 0; i < numObjects; i++) { if (i == 0) { - // this test requires a strictly increasing timer - deadHostStates[i] = new DeadHostState(new StrictMonotonicTimeSupplier()); + // this test requires a strictly increasing timer. This ensures that even if we call this time supplier in a very tight + // loop we always notice time moving forward. This does not happen for real timer implementations + // (e.g. on Linux clock_gettime provides microsecond resolution). + deadHostStates[i] = new DeadHostState(time::incrementAndGet); } else { deadHostStates[i] = new DeadHostState(deadHostStates[i - 1]); } @@ -74,42 +75,39 @@ public void testCompareToTimeSupplier() { public void testCompareToDifferingTimeSupplier() { try { - new DeadHostState(TimeSupplier.DEFAULT).compareTo( - new DeadHostState(new ConfigurableTimeSupplier())); + new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER).compareTo( + new DeadHostState(() -> 0L)); fail("expected failure"); } catch (IllegalArgumentException e) { - assertEquals("can't compare DeadHostStates with different clocks [nanoTime != configured[0]]", - e.getMessage()); + assertEquals("can't compare DeadHostStates holding different time suppliers as they may " + + "be based on different clocks", e.getMessage()); } } public void testShallBeRetried() { - ConfigurableTimeSupplier timeSupplier = new ConfigurableTimeSupplier(); + final AtomicLong time = new AtomicLong(0); DeadHostState deadHostState = null; for (int i = 0; i < EXPECTED_TIMEOUTS_SECONDS.length; i++) { long expectedTimeoutSecond = EXPECTED_TIMEOUTS_SECONDS[i]; - timeSupplier.nanoTime = 0; if (i == 0) { - deadHostState = new DeadHostState(timeSupplier); + deadHostState = new DeadHostState(time::get); } else { deadHostState = new DeadHostState(deadHostState); } for (int j = 0; j < expectedTimeoutSecond; j++) { - timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1); + time.addAndGet(TimeUnit.SECONDS.toNanos(1)); assertThat(deadHostState.shallBeRetried(), is(false)); } int iters = randomIntBetween(5, 30); for (int j = 0; j < iters; j++) { - timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1); + time.addAndGet(TimeUnit.SECONDS.toNanos(1)); assertThat(deadHostState.shallBeRetried(), is(true)); } } } public void testDeadHostStateTimeouts() { - ConfigurableTimeSupplier zeroTimeSupplier = new ConfigurableTimeSupplier(); - zeroTimeSupplier.nanoTime = 0L; - DeadHostState previous = new DeadHostState(zeroTimeSupplier); + DeadHostState previous = new DeadHostState(() -> 0L); for (long expectedTimeoutsSecond : EXPECTED_TIMEOUTS_SECONDS) { assertThat(TimeUnit.NANOSECONDS.toSeconds(previous.getDeadUntilNanos()), equalTo(expectedTimeoutsSecond)); previous = new DeadHostState(previous); @@ -123,37 +121,4 @@ public void testDeadHostStateTimeouts() { previous = deadHostState; } } - - static class ConfigurableTimeSupplier implements DeadHostState.TimeSupplier { - long nanoTime; - - @Override - public long nanoTime() { - return nanoTime; - } - - @Override - public String toString() { - return "configured[" + nanoTime + "]"; - } - } - - /** - * Simulates a monotonically strict increasing time (i.e. the value increases on every call to #nanoTime()). This ensures - * that even if we call this time supplier in a very tight loop we always notice time moving forward. This does not happen for real - * timer implementations (e.g. on Linux clock_gettime provides microsecond resolution). - */ - static class StrictMonotonicTimeSupplier implements DeadHostState.TimeSupplier { - private final AtomicLong time = new AtomicLong(0); - - @Override - public long nanoTime() { - return time.incrementAndGet(); - } - - @Override - public String toString() { - return "strict monotonic[" + time.get() + "]"; - } - } } diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java index 6b5f8bf907eea..3cfb0f32d3052 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java @@ -25,7 +25,6 @@ import org.apache.http.impl.auth.BasicScheme; import org.apache.http.impl.client.BasicAuthCache; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; -import org.elasticsearch.client.DeadHostStateTests.ConfigurableTimeSupplier; import org.elasticsearch.client.RestClient.NodeTuple; import java.io.IOException; @@ -40,6 +39,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import static java.util.Collections.singletonList; import static org.hamcrest.Matchers.instanceOf; @@ -266,14 +267,15 @@ public String toString() { // Mark all the nodes dead for a few test cases { - ConfigurableTimeSupplier timeSupplier = new ConfigurableTimeSupplier(); + final AtomicLong time = new AtomicLong(0L); + Supplier timeSupplier = time::get; Map blacklist = new HashMap<>(); blacklist.put(n1.getHost(), new DeadHostState(timeSupplier)); blacklist.put(n2.getHost(), new DeadHostState(new DeadHostState(timeSupplier))); blacklist.put(n3.getHost(), new DeadHostState(new DeadHostState(new DeadHostState(timeSupplier)))); /* - * case when fewer nodeTuple than blacklist, wont result in any IllegalCapacityException + * case when fewer nodeTuple than blacklist, won't result in any IllegalCapacityException */ { NodeTuple> fewerNodeTuple = new NodeTuple<>(Arrays.asList(n1, n2), null); @@ -282,7 +284,7 @@ public String toString() { } /* - * selectHosts will revive a single host if regardless of + * selectHosts will revive a single host regardless of * blacklist time. It'll revive the node that is closest * to being revived that the NodeSelector is ok with. */ @@ -304,7 +306,7 @@ public String toString() { * Now lets wind the clock forward, past the timeout for one of * the dead nodes. We should return it. */ - timeSupplier.nanoTime = new DeadHostState(timeSupplier).getDeadUntilNanos(); + time.set(new DeadHostState(timeSupplier).getDeadUntilNanos()); assertSelectLivingHosts(Arrays.asList(n1), nodeTuple, blacklist, NodeSelector.ANY); /* @@ -318,7 +320,7 @@ public String toString() { * blacklist timeouts then we function as though the nodes aren't * in the blacklist at all. */ - timeSupplier.nanoTime += DeadHostState.MAX_CONNECTION_TIMEOUT_NANOS; + time.addAndGet(DeadHostState.MAX_CONNECTION_TIMEOUT_NANOS); assertSelectLivingHosts(Arrays.asList(n1, n2, n3), nodeTuple, blacklist, NodeSelector.ANY); assertSelectLivingHosts(Arrays.asList(n2, n3), nodeTuple, blacklist, not1); } diff --git a/docs/java-rest/low-level/usage.asciidoc b/docs/java-rest/low-level/usage.asciidoc index ee1555019dbe1..06bd77c7710dd 100644 --- a/docs/java-rest/low-level/usage.asciidoc +++ b/docs/java-rest/low-level/usage.asciidoc @@ -14,7 +14,7 @@ The javadoc for the low level REST client can be found at {rest-client-javadoc}/ The low-level Java REST client is hosted on http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.elasticsearch.client%22[Maven -Central]. The minimum Java version required is `1.7`. +Central]. The minimum Java version required is `1.8`. The low-level REST client is subject to the same release cycle as Elasticsearch. Replace the version with the desired client version, first