Skip to content

Commit

Permalink
mgmt, use Context in LRO (#11854)
Browse files Browse the repository at this point in the history
  • Loading branch information
weidongxu-microsoft authored Jun 12, 2020
1 parent a3a7dcb commit e52686f
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.management.polling.PollResult;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.polling.LongRunningOperationStatus;
import com.azure.core.util.polling.PollResponse;
Expand Down Expand Up @@ -94,7 +95,9 @@ public static <T, U> Function<PollingContext<PollResult<T>>, Mono<U>> fetchResul
U result = deserialize(serializerAdapter, value, finalResultType);
return result != null ? Mono.just(result) : Mono.empty();
} else {
return pipeline.send(decorateRequest(new HttpRequest(HttpMethod.GET, finalResult.getResultUri())))
return FluxUtil.fluxContext(fluxContext ->
pipeline.send(decorateRequest(new HttpRequest(HttpMethod.GET, finalResult.getResultUri())),
fluxContext).flux()).next()
.flatMap((Function<HttpResponse, Mono<String>>) response -> response.getBodyAsString())
.flatMap(body -> {
U result = deserialize(serializerAdapter, body, finalResultType);
Expand Down Expand Up @@ -148,7 +151,9 @@ private static <T> Mono<PollResponse<PollResult<T>>> pollResponseMono(Serializer
* @return a Mono emitting PollingState updated from the poll operation response
*/
private static Mono<PollingState> doSinglePoll(HttpPipeline pipeline, PollingState pollingState) {
return pipeline.send(decorateRequest(new HttpRequest(HttpMethod.GET, pollingState.getPollUrl())))
return FluxUtil.fluxContext(fluxContext ->
pipeline.send(decorateRequest(new HttpRequest(HttpMethod.GET, pollingState.getPollUrl())),
fluxContext).flux()).next()
.flatMap((Function<HttpResponse, Mono<PollingState>>) response -> response.getBodyAsString()
.map(body -> pollingState.update(response.getStatusCode(), response.getHeaders(), body))
.switchIfEmpty(Mono.defer(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.azure.core.management.polling.PollResult;
import com.azure.core.management.polling.PollerFactory;
import com.azure.core.management.serializer.AzureJacksonAdapter;
import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.polling.AsyncPollResponse;
import com.azure.core.util.polling.LongRunningOperationStatus;
import com.azure.core.util.polling.PollerFlux;
Expand All @@ -45,12 +47,15 @@
import reactor.test.StepVerifier;

import java.io.IOException;
import java.lang.reflect.Type;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -76,7 +81,7 @@ public void afterTest() {
@ServiceInterface(name = "ProvisioningStateLroService")
interface ProvisioningStateLroServiceClient {
@Put("/resource/1")
Mono<Response<Flux<ByteBuffer>>> startLro();
Mono<Response<Flux<ByteBuffer>>> startLro(Context context);
}

@Test
Expand All @@ -94,7 +99,7 @@ public void lroBasedOnProvisioningState() {
FooWithProvisioningState.class,
FooWithProvisioningState.class,
POLLING_DURATION,
newLroInitFunction(client, FooWithProvisioningState.class));
newLroInitFunction(client));

int[] onNextCallCount = new int[1];
lroFlux.doOnNext(response -> {
Expand Down Expand Up @@ -168,7 +173,7 @@ public String getName() {
Resource.class,
Resource.class,
POLLING_DURATION,
newLroInitFunction(client, Resource.class));
newLroInitFunction(client));

StepVerifier.create(lroFlux)
.expectSubscription()
Expand Down Expand Up @@ -245,7 +250,7 @@ public String getName() {
FooWithProvisioningState.class,
FooWithProvisioningState.class,
POLLING_DURATION,
newLroInitFunction(client, FooWithProvisioningState.class));
newLroInitFunction(client));

Mono<FooWithProvisioningState> resultMonoWithTimeout = lroFlux.last()
.flatMap(AsyncPollResponse::getFinalResult)
Expand Down Expand Up @@ -292,7 +297,7 @@ public void lroRetryAfter() {
FooWithProvisioningState.class,
FooWithProvisioningState.class,
POLLING_DURATION,
newLroInitFunction(client, FooWithProvisioningState.class));
newLroInitFunction(client));

long nanoTime = System.nanoTime();

Expand All @@ -313,6 +318,46 @@ public void lroRetryAfter() {
}
}

@Test
public void lroContext() {
WireMockServer lroServer = startServer();
lroServer.start();

HttpPipelinePolicy contextVerifyPolicy = (context, next) -> {
Optional<Object> valueOpt = context.getData("key1");
if (valueOpt.isPresent() && "value1".equals(valueOpt.get())) {
return next.process();
} else {
return Mono.error(new AssertionError());
}
};

try {
final ProvisioningStateLroServiceClient client = RestProxy.create(ProvisioningStateLroServiceClient.class,
createHttpPipeline(lroServer.port(), Collections.singletonList(contextVerifyPolicy)),
SERIALIZER);

Flux<AsyncPollResponse<PollResult<FooWithProvisioningState>, FooWithProvisioningState>> lroFlux
= PollerFactory.create(SERIALIZER,
new HttpPipelineBuilder().build(),
FooWithProvisioningState.class,
FooWithProvisioningState.class,
POLLING_DURATION,
newLroInitFunction(client));
lroFlux = lroFlux.subscriberContext(context -> context.put("key1", "value1"));

FooWithProvisioningState result = lroFlux
.blockLast()
.getFinalResult()
.block();
Assertions.assertNotNull(result);
} finally {
if (lroServer.isRunning()) {
lroServer.shutdown();
}
}
}

private static class ServerConfigure {
private int pollingCountTillSuccess = 2;
private HttpHeaders additionalHeaders = HttpHeaders.noHeaders();
Expand Down Expand Up @@ -394,30 +439,36 @@ private static WireMockServer createServer(ResponseTransformer transformer,
}

private static HttpPipeline createHttpPipeline(int port) {
return new HttpPipelineBuilder()
.policies(new HttpPipelinePolicy() {
@Override
public Mono<HttpResponse> process(HttpPipelineCallContext context,
HttpPipelineNextPolicy next) {
HttpRequest request = context.getHttpRequest();
request.setUrl(updatePort(request.getUrl(), port));
context.setHttpRequest(request);
return next.process();
}
return createHttpPipeline(port, Collections.emptyList());
}

private URL updatePort(URL url, int port) {
try {
return new URL(url.getProtocol(), url.getHost(), port, url.getFile());
} catch (MalformedURLException mue) {
throw new RuntimeException(mue);
}
private static HttpPipeline createHttpPipeline(int port, List<HttpPipelinePolicy> additionalPolicies) {
List<HttpPipelinePolicy> policies = new ArrayList<>(additionalPolicies);
policies.add(new HttpPipelinePolicy() {
@Override
public Mono<HttpResponse> process(HttpPipelineCallContext context,
HttpPipelineNextPolicy next) {
HttpRequest request = context.getHttpRequest();
request.setUrl(updatePort(request.getUrl(), port));
context.setHttpRequest(request);
return next.process();
}

private URL updatePort(URL url, int port) {
try {
return new URL(url.getProtocol(), url.getHost(), port, url.getFile());
} catch (MalformedURLException mue) {
throw new RuntimeException(mue);
}
})
}
});
return new HttpPipelineBuilder()
.policies(policies.toArray(new HttpPipelinePolicy[0]))
.build();
}

private Mono<Response<Flux<ByteBuffer>>> newLroInitFunction(ProvisioningStateLroServiceClient client, Type type) {
return client.startLro();
private Mono<Response<Flux<ByteBuffer>>> newLroInitFunction(ProvisioningStateLroServiceClient client) {
return FluxUtil.fluxContext(context -> client.startLro(context).flux()).next();
}

private static String toJson(Object object) {
Expand Down

0 comments on commit e52686f

Please sign in to comment.