From e52686f39535b4edddf80fd9dc262017ac8e88d6 Mon Sep 17 00:00:00 2001 From: Weidong Xu Date: Fri, 12 Jun 2020 10:51:53 +0800 Subject: [PATCH] mgmt, use Context in LRO (#11854) --- .../implementation/polling/PollOperation.java | 9 +- .../polling/LROPollerTests.java | 101 +++++++++++++----- 2 files changed, 83 insertions(+), 27 deletions(-) diff --git a/sdk/core/azure-core-management/src/main/java/com/azure/core/management/implementation/polling/PollOperation.java b/sdk/core/azure-core-management/src/main/java/com/azure/core/management/implementation/polling/PollOperation.java index a0d966ee78bc5..1d97fecce213a 100644 --- a/sdk/core/azure-core-management/src/main/java/com/azure/core/management/implementation/polling/PollOperation.java +++ b/sdk/core/azure-core-management/src/main/java/com/azure/core/management/implementation/polling/PollOperation.java @@ -8,6 +8,7 @@ import com.azure.core.http.HttpRequest; import com.azure.core.http.HttpResponse; import com.azure.core.management.polling.PollResult; +import com.azure.core.util.FluxUtil; import com.azure.core.util.logging.ClientLogger; import com.azure.core.util.polling.LongRunningOperationStatus; import com.azure.core.util.polling.PollResponse; @@ -94,7 +95,9 @@ public static Function>, Mono> fetchResul U result = deserialize(serializerAdapter, value, finalResultType); return result != null ? Mono.just(result) : Mono.empty(); } else { - return pipeline.send(decorateRequest(new HttpRequest(HttpMethod.GET, finalResult.getResultUri()))) + return FluxUtil.fluxContext(fluxContext -> + pipeline.send(decorateRequest(new HttpRequest(HttpMethod.GET, finalResult.getResultUri())), + fluxContext).flux()).next() .flatMap((Function>) response -> response.getBodyAsString()) .flatMap(body -> { U result = deserialize(serializerAdapter, body, finalResultType); @@ -148,7 +151,9 @@ private static Mono>> pollResponseMono(Serializer * @return a Mono emitting PollingState updated from the poll operation response */ private static Mono doSinglePoll(HttpPipeline pipeline, PollingState pollingState) { - return pipeline.send(decorateRequest(new HttpRequest(HttpMethod.GET, pollingState.getPollUrl()))) + return FluxUtil.fluxContext(fluxContext -> + pipeline.send(decorateRequest(new HttpRequest(HttpMethod.GET, pollingState.getPollUrl())), + fluxContext).flux()).next() .flatMap((Function>) response -> response.getBodyAsString() .map(body -> pollingState.update(response.getStatusCode(), response.getHeaders(), body)) .switchIfEmpty(Mono.defer(() -> { diff --git a/sdk/core/azure-core-management/src/test/java/com/azure/core/management/implementation/polling/LROPollerTests.java b/sdk/core/azure-core-management/src/test/java/com/azure/core/management/implementation/polling/LROPollerTests.java index 404dc24fd9543..df18762991e28 100644 --- a/sdk/core/azure-core-management/src/test/java/com/azure/core/management/implementation/polling/LROPollerTests.java +++ b/sdk/core/azure-core-management/src/test/java/com/azure/core/management/implementation/polling/LROPollerTests.java @@ -19,6 +19,8 @@ import com.azure.core.management.polling.PollResult; import com.azure.core.management.polling.PollerFactory; import com.azure.core.management.serializer.AzureJacksonAdapter; +import com.azure.core.util.Context; +import com.azure.core.util.FluxUtil; import com.azure.core.util.polling.AsyncPollResponse; import com.azure.core.util.polling.LongRunningOperationStatus; import com.azure.core.util.polling.PollerFlux; @@ -45,12 +47,15 @@ import reactor.test.StepVerifier; import java.io.IOException; -import java.lang.reflect.Type; import java.net.MalformedURLException; import java.net.URL; import java.nio.ByteBuffer; import java.time.Duration; import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -76,7 +81,7 @@ public void afterTest() { @ServiceInterface(name = "ProvisioningStateLroService") interface ProvisioningStateLroServiceClient { @Put("/resource/1") - Mono>> startLro(); + Mono>> startLro(Context context); } @Test @@ -94,7 +99,7 @@ public void lroBasedOnProvisioningState() { FooWithProvisioningState.class, FooWithProvisioningState.class, POLLING_DURATION, - newLroInitFunction(client, FooWithProvisioningState.class)); + newLroInitFunction(client)); int[] onNextCallCount = new int[1]; lroFlux.doOnNext(response -> { @@ -168,7 +173,7 @@ public String getName() { Resource.class, Resource.class, POLLING_DURATION, - newLroInitFunction(client, Resource.class)); + newLroInitFunction(client)); StepVerifier.create(lroFlux) .expectSubscription() @@ -245,7 +250,7 @@ public String getName() { FooWithProvisioningState.class, FooWithProvisioningState.class, POLLING_DURATION, - newLroInitFunction(client, FooWithProvisioningState.class)); + newLroInitFunction(client)); Mono resultMonoWithTimeout = lroFlux.last() .flatMap(AsyncPollResponse::getFinalResult) @@ -292,7 +297,7 @@ public void lroRetryAfter() { FooWithProvisioningState.class, FooWithProvisioningState.class, POLLING_DURATION, - newLroInitFunction(client, FooWithProvisioningState.class)); + newLroInitFunction(client)); long nanoTime = System.nanoTime(); @@ -313,6 +318,46 @@ public void lroRetryAfter() { } } + @Test + public void lroContext() { + WireMockServer lroServer = startServer(); + lroServer.start(); + + HttpPipelinePolicy contextVerifyPolicy = (context, next) -> { + Optional valueOpt = context.getData("key1"); + if (valueOpt.isPresent() && "value1".equals(valueOpt.get())) { + return next.process(); + } else { + return Mono.error(new AssertionError()); + } + }; + + try { + final ProvisioningStateLroServiceClient client = RestProxy.create(ProvisioningStateLroServiceClient.class, + createHttpPipeline(lroServer.port(), Collections.singletonList(contextVerifyPolicy)), + SERIALIZER); + + Flux, FooWithProvisioningState>> lroFlux + = PollerFactory.create(SERIALIZER, + new HttpPipelineBuilder().build(), + FooWithProvisioningState.class, + FooWithProvisioningState.class, + POLLING_DURATION, + newLroInitFunction(client)); + lroFlux = lroFlux.subscriberContext(context -> context.put("key1", "value1")); + + FooWithProvisioningState result = lroFlux + .blockLast() + .getFinalResult() + .block(); + Assertions.assertNotNull(result); + } finally { + if (lroServer.isRunning()) { + lroServer.shutdown(); + } + } + } + private static class ServerConfigure { private int pollingCountTillSuccess = 2; private HttpHeaders additionalHeaders = HttpHeaders.noHeaders(); @@ -394,30 +439,36 @@ private static WireMockServer createServer(ResponseTransformer transformer, } private static HttpPipeline createHttpPipeline(int port) { - return new HttpPipelineBuilder() - .policies(new HttpPipelinePolicy() { - @Override - public Mono process(HttpPipelineCallContext context, - HttpPipelineNextPolicy next) { - HttpRequest request = context.getHttpRequest(); - request.setUrl(updatePort(request.getUrl(), port)); - context.setHttpRequest(request); - return next.process(); - } + return createHttpPipeline(port, Collections.emptyList()); + } - private URL updatePort(URL url, int port) { - try { - return new URL(url.getProtocol(), url.getHost(), port, url.getFile()); - } catch (MalformedURLException mue) { - throw new RuntimeException(mue); - } + private static HttpPipeline createHttpPipeline(int port, List additionalPolicies) { + List policies = new ArrayList<>(additionalPolicies); + policies.add(new HttpPipelinePolicy() { + @Override + public Mono process(HttpPipelineCallContext context, + HttpPipelineNextPolicy next) { + HttpRequest request = context.getHttpRequest(); + request.setUrl(updatePort(request.getUrl(), port)); + context.setHttpRequest(request); + return next.process(); + } + + private URL updatePort(URL url, int port) { + try { + return new URL(url.getProtocol(), url.getHost(), port, url.getFile()); + } catch (MalformedURLException mue) { + throw new RuntimeException(mue); } - }) + } + }); + return new HttpPipelineBuilder() + .policies(policies.toArray(new HttpPipelinePolicy[0])) .build(); } - private Mono>> newLroInitFunction(ProvisioningStateLroServiceClient client, Type type) { - return client.startLro(); + private Mono>> newLroInitFunction(ProvisioningStateLroServiceClient client) { + return FluxUtil.fluxContext(context -> client.startLro(context).flux()).next(); } private static String toJson(Object object) {