Skip to content

Commit

Permalink
Add RequestContent (Azure#21320)
Browse files Browse the repository at this point in the history
Added RequestContent and Updated HttpRequest to Use It
  • Loading branch information
alzimmermsft authored Jun 28, 2021
1 parent 155e942 commit edbf95c
Show file tree
Hide file tree
Showing 20 changed files with 1,216 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ public class GoodLoggingCheck extends AbstractCheck {
private static final String CLIENT_LOGGER = "ClientLogger";
private static final String LOGGER = "logger";
private static final String STATIC_LOGGER_ERROR = "Use a static ClientLogger instance in a static method.";
private static final int[] REQUIRED_TOKENS = new int[]{
TokenTypes.IMPORT,
TokenTypes.INTERFACE_DEF,
TokenTypes.CLASS_DEF,
TokenTypes.LITERAL_NEW,
TokenTypes.VARIABLE_DEF,
TokenTypes.METHOD_CALL,
TokenTypes.METHOD_DEF
};

private static final String LOGGER_NAME_ERROR =
"ClientLogger instance naming: use ''%s'' instead of ''%s'' for consistency.";
Expand All @@ -42,7 +51,7 @@ public class GoodLoggingCheck extends AbstractCheck {
// Boolean indicator that indicates if the java class imports ClientLogger
private boolean hasClientLoggerImported;
// A LIFO queue stores the class names, pop top element if exist the class name AST node
private Queue<String> classNameDeque = Collections.asLifoQueue(new ArrayDeque<>());
private final Queue<String> classNameDeque = Collections.asLifoQueue(new ArrayDeque<>());
// Collection of Invalid logging packages
private static final Set<String> INVALID_LOGS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
"org.slf4j", "org.apache.logging.log4j", "java.util.logging"
Expand All @@ -60,14 +69,7 @@ public int[] getAcceptableTokens() {

@Override
public int[] getRequiredTokens() {
return new int[] {
TokenTypes.IMPORT,
TokenTypes.CLASS_DEF,
TokenTypes.LITERAL_NEW,
TokenTypes.VARIABLE_DEF,
TokenTypes.METHOD_CALL,
TokenTypes.METHOD_DEF
};
return REQUIRED_TOKENS;
}

@Override
Expand Down Expand Up @@ -96,6 +98,7 @@ public void visitToken(DetailAST ast) {
});
break;
case TokenTypes.CLASS_DEF:
case TokenTypes.INTERFACE_DEF:
classNameDeque.offer(ast.findFirstToken(TokenTypes.IDENT).getText());
break;
case TokenTypes.LITERAL_NEW:
Expand Down Expand Up @@ -194,21 +197,20 @@ private void checkForInvalidStaticLoggerUsage(DetailAST methodDefToken) {
// if not a static method
if (!(TokenUtil.findFirstTokenByPredicate(methodDefToken,
node -> node.branchContains(TokenTypes.LITERAL_STATIC)).isPresent())) {

// error if static `LOGGER` present, LOGGER.*
if (methodDefToken.findFirstToken(TokenTypes.SLIST) != null) {
TokenUtil
.forEachChild(methodDefToken.findFirstToken(TokenTypes.SLIST), TokenTypes.EXPR, (exprToken) -> {
if (exprToken != null) {
DetailAST methodCallToken = exprToken.findFirstToken(TokenTypes.METHOD_CALL);
if (methodCallToken != null && methodCallToken.findFirstToken(TokenTypes.DOT) != null) {
if (methodCallToken.findFirstToken(TokenTypes.DOT)
.findFirstToken(TokenTypes.IDENT).getText().equals(LOGGER.toUpperCase())) {
log(methodDefToken, STATIC_LOGGER_ERROR);
}
TokenUtil.forEachChild(methodDefToken.findFirstToken(TokenTypes.SLIST), TokenTypes.EXPR, exprToken -> {
if (exprToken != null) {
DetailAST methodCallToken = exprToken.findFirstToken(TokenTypes.METHOD_CALL);
if (methodCallToken != null && methodCallToken.findFirstToken(TokenTypes.DOT) != null) {
if (methodCallToken.findFirstToken(TokenTypes.DOT)
.findFirstToken(TokenTypes.IDENT).getText().equals(LOGGER.toUpperCase())) {
log(methodDefToken, STATIC_LOGGER_ERROR);
}
}
});
}
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

/**
* Package containing implementation details.
*/
package com.azure.core.http.netty.implementation;
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.function.Function;

/**
* HttpClient implementation for OkHttp.
Expand Down Expand Up @@ -64,9 +63,13 @@ public Mono<HttpResponse> send(HttpRequest request, Context context) {
// but block on the thread backing flux. This ignore any subscribeOn applied to send(r)
//
toOkHttpRequest(request).subscribe(okHttpRequest -> {
Call call = httpClient.newCall(okHttpRequest);
call.enqueue(new OkHttpCallback(sink, request, eagerlyReadResponse));
sink.onCancel(call::cancel);
try {
Call call = httpClient.newCall(okHttpRequest);
call.enqueue(new OkHttpCallback(sink, request, eagerlyReadResponse));
sink.onCancel(call::cancel);
} catch (Exception ex) {
sink.error(ex);
}
}, sink::error);
}));
}
Expand All @@ -78,29 +81,26 @@ public Mono<HttpResponse> send(HttpRequest request, Context context) {
* @return the Mono emitting okhttp request
*/
private static Mono<okhttp3.Request> toOkHttpRequest(HttpRequest request) {
return Mono.just(new okhttp3.Request.Builder())
.map(rb -> {
rb.url(request.getUrl());
if (request.getHeaders() != null) {
for (HttpHeader hdr : request.getHeaders()) {
// OkHttp allows for headers with multiple values, but it treats them as separate headers,
// therefore, we must call rb.addHeader for each value, using the same key for all of them
hdr.getValuesList().forEach(value -> rb.addHeader(hdr.getName(), value));
}
}
return rb;
})
.flatMap((Function<Request.Builder, Mono<Request.Builder>>) rb -> {
if (request.getHttpMethod() == HttpMethod.GET) {
return Mono.just(rb.get());
} else if (request.getHttpMethod() == HttpMethod.HEAD) {
return Mono.just(rb.head());
} else {
return toOkHttpRequestBody(request.getBody(), request.getHeaders())
.map(requestBody -> rb.method(request.getHttpMethod().toString(), requestBody));
}
})
.map(Request.Builder::build);
Request.Builder requestBuilder = new Request.Builder()
.url(request.getUrl());

if (request.getHeaders() != null) {
for (HttpHeader hdr : request.getHeaders()) {
// OkHttp allows for headers with multiple values, but it treats them as separate headers,
// therefore, we must call rb.addHeader for each value, using the same key for all of them
hdr.getValuesList().forEach(value -> requestBuilder.addHeader(hdr.getName(), value));
}
}

if (request.getHttpMethod() == HttpMethod.GET) {
return Mono.just(requestBuilder.get().build());
} else if (request.getHttpMethod() == HttpMethod.HEAD) {
return Mono.just(requestBuilder.head().build());
}

return toOkHttpRequestBody(request.getBody(), request.getHeaders())
.map(okhttpRequestBody -> requestBuilder.method(request.getHttpMethod().toString(), okhttpRequestBody)
.build());
}

/**
Expand All @@ -117,11 +117,9 @@ private static Mono<RequestBody> toOkHttpRequestBody(Flux<ByteBuffer> bbFlux, Ht

return bsMono.map(bs -> {
String contentType = headers.getValue("Content-Type");
if (contentType == null) {
return RequestBody.create(bs, null);
} else {
return RequestBody.create(bs, MediaType.parse(contentType));
}
MediaType mediaType = (contentType == null) ? null : MediaType.parse(contentType);

return RequestBody.create(bs, mediaType);
});
}

Expand All @@ -146,9 +144,7 @@ private static Mono<ByteString> toByteString(Flux<ByteBuffer> bbFlux) {
} catch (IOException ioe) {
throw Exceptions.propagate(ioe);
}
})
.map(b -> ByteString.of(b.readByteArray())),
okio.Buffer::clear)
}).map(b -> ByteString.of(b.readByteArray())), okio.Buffer::clear)
.switchIfEmpty(EMPTY_BYTE_STRING_MONO);
}

Expand All @@ -163,11 +159,13 @@ private static class OkHttpCallback implements okhttp3.Callback {
this.eagerlyReadResponse = eagerlyReadResponse;
}

@SuppressWarnings("NullableProblems")
@Override
public void onFailure(okhttp3.Call call, IOException e) {
sink.error(e);
}

@SuppressWarnings("NullableProblems")
@Override
public void onResponse(okhttp3.Call call, okhttp3.Response response) {
/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

/**
* Package containing implementation details.
*/
package com.azure.core.http.okhttp.implementation;
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -40,6 +39,7 @@
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertLinesMatch;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -66,6 +66,7 @@ public static void beforeClass() {
server.stubFor(post("/shortPost").willReturn(aResponse().withBody(SHORT_BODY)));
server.stubFor(get(RETURN_HEADERS_AS_IS_PATH).willReturn(aResponse()
.withTransformers(OkHttpAsyncHttpClientResponseTransformer.NAME)));

server.start();
}

Expand All @@ -87,25 +88,33 @@ public void testFlowableResponseLongBodyAsByteArrayAsync() {
}

@Test
@Disabled("This tests behaviour of reactor netty's ByteBufFlux, not applicable for OkHttp")
public void testMultipleSubscriptionsEmitsError() {
HttpResponse response = getResponse("/short");

// Subscription:1
response.getBodyAsByteArray().block();
StepVerifier.create(response.getBodyAsByteArray())
.assertNext(Assertions::assertNotNull)
.expectComplete()
.verify(Duration.ofSeconds(20));

// Subscription:2
// Getting the bytes of an OkHttp response closes the stream on first read.
// Subsequent reads will return an IllegalStateException due to the stream being closed.
StepVerifier.create(response.getBodyAsByteArray())
.expectNextCount(0) // TODO: Check with smaldini, what is the verifier operator equivalent to .awaitDone(20, TimeUnit.SECONDS)
.verifyError(IllegalStateException.class);
.expectNextCount(0)
.expectError(IllegalStateException.class)
.verify(Duration.ofSeconds(20));

}

@Test
public void testFlowableWhenServerReturnsBodyAndNoErrorsWhenHttp500Returned() {
HttpResponse response = getResponse("/error");
StepVerifier.create(response.getBodyAsString())
.expectNext("error") // TODO: .awaitDone(20, TimeUnit.SECONDS) [See previous todo]
.verifyComplete();
assertEquals(500, response.getStatusCode());
StepVerifier.create(response.getBodyAsString())
.expectNext("error")
.expectComplete()
.verify(Duration.ofSeconds(20));
}

@Test
Expand All @@ -128,7 +137,7 @@ public void testFlowableBackpressure() {

@Test
public void testRequestBodyIsErrorShouldPropagateToResponse() {
HttpClient client = HttpClient.createDefault();
HttpClient client = new OkHttpAsyncClientProvider().createInstance();
HttpRequest request = new HttpRequest(HttpMethod.POST, url(server, "/shortPost"))
.setHeader("Content-Length", "123")
.setBody(Flux.error(new RuntimeException("boo")));
Expand All @@ -140,7 +149,7 @@ public void testRequestBodyIsErrorShouldPropagateToResponse() {

@Test
public void testRequestBodyEndsInErrorShouldPropagateToResponse() {
HttpClient client = HttpClient.createDefault();
HttpClient client = new OkHttpAsyncClientProvider().createInstance();
String contentChunk = "abcdefgh";
int repetitions = 1000;
HttpRequest request = new HttpRequest(HttpMethod.POST, url(server, "/shortPost"))
Expand All @@ -149,10 +158,14 @@ public void testRequestBodyEndsInErrorShouldPropagateToResponse() {
.repeat(repetitions)
.map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
.concatWith(Flux.error(new RuntimeException("boo"))));
StepVerifier.create(client.send(request))
// .awaitDone(10, TimeUnit.SECONDS)
.expectErrorMessage("boo")
.verify();

try {
StepVerifier.create(client.send(request))
.expectErrorMessage("boo")
.verify(Duration.ofSeconds(10));
} catch (Exception ex) {
assertEquals("boo", ex.getMessage());
}
}

@Test
Expand Down Expand Up @@ -200,42 +213,30 @@ public void testServerShutsDownSocketShouldPushErrorToContentFlowable() {
});
}

@Disabled("This flakey test fails often on MacOS. https://github.com/Azure/azure-sdk-for-java/issues/4357.")
@Test
public void testConcurrentRequests() throws NoSuchAlgorithmException {
int numRequests = 100; // 100 = 1GB of data read
HttpClient client = HttpClient.createDefault();
HttpClient client = new OkHttpAsyncClientProvider().createInstance();
byte[] expectedDigest = digest(LONG_BODY);
long expectedByteCount = (long) numRequests * LONG_BODY.getBytes(StandardCharsets.UTF_8).length;

Mono<Long> numBytesMono = Flux.range(1, numRequests)
.parallel(10)
.runOn(Schedulers.boundedElastic())
.flatMap(n -> Mono.fromCallable(() -> getResponse(client, "/long")).flatMapMany(response -> {
MessageDigest md = md5Digest();
return response.getBody()
.doOnNext(md::update)
.map(bb -> new NumberedByteBuffer(n, bb))
// .doOnComplete(() -> System.out.println("completed " + n))
.doOnComplete(() -> Assertions.assertArrayEquals(expectedDigest,
md.digest(), "wrong digest!"));
.doOnNext(buffer -> md.update(buffer.duplicate()))
.doOnComplete(() -> assertArrayEquals(expectedDigest, md.digest(), "wrong digest!"));
}))
.sequential()
// enable the doOnNext call to see request numbers and thread names
// .doOnNext(g -> System.out.println(g.n + " " +
// Thread.currentThread().getName()))
.map(nbb -> (long) nbb.bb.limit())
.reduce(Long::sum)
.subscribeOn(Schedulers.boundedElastic());
.map(buffer -> (long) buffer.remaining())
.reduce(Long::sum);

StepVerifier.create(numBytesMono)
// .awaitDone(timeoutSeconds, TimeUnit.SECONDS)
.expectNext((long) (numRequests * LONG_BODY.getBytes(StandardCharsets.UTF_8).length))
.verifyComplete();
//
// long numBytes = numBytesMono.block();
// t = System.currentTimeMillis() - t;
// System.out.println("totalBytesRead=" + numBytes / 1024 / 1024 + "MB in " + t / 1000.0 + "s");
// assertEquals(numRequests * LONG_BODY.getBytes(StandardCharsets.UTF_8).length, numBytes);
.expectNext(expectedByteCount)
.expectComplete()
.verify(Duration.ofSeconds(60));
}

@Test
Expand Down Expand Up @@ -284,16 +285,6 @@ private static byte[] digest(String s) throws NoSuchAlgorithmException {
return md.digest();
}

private static final class NumberedByteBuffer {
final long n;
final ByteBuffer bb;

NumberedByteBuffer(long n, ByteBuffer bb) {
this.n = n;
this.bb = bb;
}
}

private static HttpResponse getResponse(String path) {
HttpClient client = new OkHttpAsyncHttpClientBuilder().build();
return getResponse(client, path);
Expand Down
1 change: 1 addition & 0 deletions sdk/core/azure-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@
--add-opens com.azure.core/com.azure.core.implementation.models.jsonflatten=com.fasterxml.jackson.databind
--add-opens com.azure.core/com.azure.core.implementation.models.jsonflatten=ALL-UNNAMED
--add-opens com.azure.core/com.azure.core.implementation.serializer=ALL-UNNAMED
--add-opens com.azure.core/com.azure.core.implementation.util=ALL-UNNAMED
--add-opens com.azure.core/com.azure.core.models=ALL-UNNAMED
--add-opens com.azure.core/com.azure.core.util=ALL-UNNAMED
--add-opens com.azure.core/com.azure.core.util.jsonpatch=ALL-UNNAMED
Expand Down
Loading

0 comments on commit edbf95c

Please sign in to comment.