Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry ConnectException, add retry logging #6614

Merged
merged 4 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.exporter.sender.jdk.internal;

import static java.util.stream.Collectors.joining;

import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.http.HttpSender;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
Expand All @@ -25,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
Expand All @@ -33,6 +36,8 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;
Expand All @@ -52,6 +57,8 @@
private static final ThreadLocal<ByteBufferPool> threadLocalByteBufPool =
ThreadLocal.withInitial(ByteBufferPool::new);

private static final Logger logger = Logger.getLogger(JdkHttpSender.class.getName());

private final ExecutorService executorService = Executors.newFixedThreadPool(5);
private final HttpClient client;
private final URI uri;
Expand Down Expand Up @@ -211,11 +218,37 @@
exception = e;
}

if (httpResponse != null && !retryableStatusCodes.contains(httpResponse.statusCode())) {
return httpResponse;
if (httpResponse != null) {
boolean retryable = retryableStatusCodes.contains(httpResponse.statusCode());
if (logger.isLoggable(Level.FINER)) {
logger.log(

Check warning on line 224 in exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java#L224

Added line #L224 was not covered by tests
Level.FINER,
"Attempt "
+ attempt
+ " returned "
+ (retryable ? "retryable" : "non-retryable")
+ " response: "
+ responseStringRepresentation(httpResponse));

Check warning on line 231 in exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java#L231

Added line #L231 was not covered by tests
}
if (!retryable) {
return httpResponse;
}
}
if (exception != null && !isRetryableException(exception)) {
throw exception;
if (exception != null) {
boolean retryable = isRetryableException(exception);
if (logger.isLoggable(Level.FINER)) {
logger.log(

Check warning on line 240 in exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java#L240

Added line #L240 was not covered by tests
Level.FINER,
"Attempt "
+ attempt
+ " failed with "
+ (retryable ? "retryable" : "non-retryable")
+ " exception",
exception);
}
if (!retryable) {
throw exception;
}
}
} while (attempt < retryPolicy.getMaxAttempts());

Expand All @@ -225,6 +258,17 @@
throw exception;
}

private static String responseStringRepresentation(HttpResponse<?> response) {
StringJoiner joiner = new StringJoiner(",", "HttpResponse{", "}");
joiner.add("code=" + response.statusCode());
joiner.add(

Check warning on line 264 in exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java#L262-L264

Added lines #L262 - L264 were not covered by tests
"headers="
+ response.headers().map().entrySet().stream()
.map(entry -> entry.getKey() + "=" + String.join(",", entry.getValue()))
.collect(joining(",", "[", "]")));
return joiner.toString();

Check warning on line 269 in exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java#L266-L269

Added lines #L266 - L269 were not covered by tests
}

private void write(Marshaler marshaler, OutputStream os) throws IOException {
if (exportAsJson) {
marshaler.writeJsonTo(os);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.IOException;
import java.net.ConnectException;
import java.net.ServerSocket;
import java.net.http.HttpClient;
import java.net.http.HttpConnectTimeoutException;
import java.time.Duration;
Expand Down Expand Up @@ -53,8 +55,8 @@ void setup() throws IOException, InterruptedException {
sender =
new JdkHttpSender(
mockHttpClient,
"http://10.255.255.1", // Connecting to a non-routable IP address to trigger connection
// timeout
// Connecting to a non-routable IP address to trigger connection timeout
"http://10.255.255.1",
null,
false,
"text/plain",
Expand All @@ -74,6 +76,44 @@ void sendInternal_RetryableConnectTimeoutException() throws IOException, Interru
verify(mockHttpClient, times(2)).send(any(), any());
}

@Test
void sendInternal_RetryableConnectException() throws IOException, InterruptedException {
sender =
new JdkHttpSender(
mockHttpClient,
// Connecting to localhost on an unused port address to trigger
// java.net.ConnectException (or java.net.http.HttpConnectTimeoutException on linux java
// 11+)
"http://localhost:" + freePort(),
null,
false,
"text/plain",
Duration.ofSeconds(10).toNanos(),
Collections::emptyMap,
RetryPolicy.builder()
.setMaxAttempts(2)
.setInitialBackoff(Duration.ofMillis(1))
.build());

assertThatThrownBy(() -> sender.sendInternal(new NoOpMarshaler()))
.satisfies(
e ->
assertThat(
(e instanceof ConnectException)
|| (e instanceof HttpConnectTimeoutException))
.isTrue());

verify(mockHttpClient, times(2)).send(any(), any());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😎 solid test!


private static int freePort() {
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Test
void sendInternal_RetryableIoException() throws IOException, InterruptedException {
doThrow(new IOException("error!")).when(mockHttpClient).send(any(), any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@

package io.opentelemetry.exporter.sender.okhttp.internal;

import static java.util.stream.Collectors.joining;

import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.Locale;
import java.util.StringJoiner;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import okhttp3.Interceptor;
import okhttp3.Response;

Expand All @@ -23,6 +29,8 @@
*/
public final class RetryInterceptor implements Interceptor {

private static final Logger logger = Logger.getLogger(RetryInterceptor.class.getName());

private final RetryPolicy retryPolicy;
private final Function<Response, Boolean> isRetryable;
private final Function<IOException, Boolean> isRetryableException;
Expand Down Expand Up @@ -84,12 +92,39 @@
} catch (IOException e) {
exception = e;
}
if (response != null && !Boolean.TRUE.equals(isRetryable.apply(response))) {
return response;
if (response != null) {
boolean retryable = Boolean.TRUE.equals(isRetryable.apply(response));
if (logger.isLoggable(Level.FINER)) {
logger.log(

Check warning on line 98 in exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java#L98

Added line #L98 was not covered by tests
Level.FINER,
"Attempt "
+ attempt
+ " returned "
+ (retryable ? "retryable" : "non-retryable")
+ " response: "
+ responseStringRepresentation(response));

Check warning on line 105 in exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java#L105

Added line #L105 was not covered by tests
}
if (!retryable) {
return response;
}
}
if (exception != null && !Boolean.TRUE.equals(isRetryableException.apply(exception))) {
throw exception;
if (exception != null) {
boolean retryable = Boolean.TRUE.equals(isRetryableException.apply(exception));
if (logger.isLoggable(Level.FINER)) {
logger.log(

Check warning on line 114 in exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java#L114

Added line #L114 was not covered by tests
Level.FINER,
"Attempt "
+ attempt
+ " failed with "
+ (retryable ? "retryable" : "non-retryable")
+ " exception",
exception);
}
if (!retryable) {
throw exception;
}
}

} while (attempt < retryPolicy.getMaxAttempts());

if (response != null) {
Expand All @@ -98,15 +133,31 @@
throw exception;
}

private static String responseStringRepresentation(Response response) {
StringJoiner joiner = new StringJoiner(",", "Response{", "}");
joiner.add("code=" + response.code());
joiner.add(

Check warning on line 139 in exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java#L137-L139

Added lines #L137 - L139 were not covered by tests
"headers="
+ response.headers().toMultimap().entrySet().stream()
.map(entry -> entry.getKey() + "=" + String.join(",", entry.getValue()))
.collect(joining(",", "[", "]")));
return joiner.toString();

Check warning on line 144 in exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java#L141-L144

Added lines #L141 - L144 were not covered by tests
}

// Visible for testing
static boolean isRetryableException(IOException e) {
if (!(e instanceof SocketTimeoutException)) {
return false;
if (e instanceof SocketTimeoutException) {
String message = e.getMessage();
// Connect timeouts can produce SocketTimeoutExceptions with no message, or with "connect
// timed
// out"
jack-berg marked this conversation as resolved.
Show resolved Hide resolved
return message == null || message.toLowerCase(Locale.ROOT).contains("connect timed out");
} else if (e instanceof ConnectException) {
// Exceptions resemble: java.net.ConnectException: Failed to connect to
// localhost/[0:0:0:0:0:0:0:1]:62611
return true;
}
String message = e.getMessage();
// Connect timeouts can produce SocketTimeoutExceptions with no message, or with "connect timed
// out"
return message == null || message.toLowerCase(Locale.ROOT).contains("connect timed out");
return false;
}

// Visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.linecorp.armeria.testing.junit5.server.mock.MockWebServerExtension;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.IOException;
import java.net.ConnectException;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -157,6 +159,34 @@ void connectTimeout() throws Exception {
verify(sleeper, times(4)).sleep(anyLong());
}

@Test
void connectException() throws Exception {
client = connectTimeoutClient();
when(random.get(anyLong())).thenReturn(1L);
doNothing().when(sleeper).sleep(anyLong());

// Connecting to localhost on an unused port address to trigger java.net.ConnectException
int openPort = freePort();
assertThatThrownBy(
() ->
client
.newCall(new Request.Builder().url("http://localhost:" + openPort).build())
.execute())
.isInstanceOf(ConnectException.class);

verify(isRetryableException, times(5)).apply(any());
// Should retry maxAttempts, and sleep maxAttempts - 1 times
verify(sleeper, times(4)).sleep(anyLong());
}

private static int freePort() {
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Test
void nonRetryableException() throws InterruptedException {
client = connectTimeoutClient();
Expand Down
Loading