Skip to content

Commit

Permalink
Fix DownloadResponseTests that began to fail after upgrading Reactor …
Browse files Browse the repository at this point in the history
…version (#13463)
  • Loading branch information
alzimmermsft authored Jul 23, 2020
1 parent 1806a77 commit 5af3d0a
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,15 @@
import com.azure.storage.blob.implementation.models.BlobsDownloadResponse;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.DownloadRetryOptions;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.concurrent.TimeoutException;

class DownloadResponseMockFlux extends Flux<ByteBuffer> {
class DownloadResponseMockFlux {
static final int DR_TEST_SCENARIO_SUCCESSFUL_ONE_CHUNK = 0; // Data emitted in one chunk
static final int DR_TEST_SCENARIO_SUCCESSFUL_MULTI_CHUNK = 1; // Data emitted in multiple chunks
static final int DR_TEST_SCENARIO_SUCCESSFUL_STREAM_FAILURES = 2; // Stream failures successfully handled
Expand All @@ -33,10 +31,11 @@ class DownloadResponseMockFlux extends Flux<ByteBuffer> {
static final int DR_TEST_SCENARIO_TIMEOUT = 10; // ReliableDownload with timeout after not receiving items for 60s
static final int DR_TEST_SCENARIO_ERROR_AFTER_ALL_DATA = 11; // Don't actually issue another retry if we've read all the data and the source failed at the end

private int scenario;
private final int scenario;
private final ByteBuffer scenarioData;

private int tryNumber;
private HttpGetterInfo info;
private ByteBuffer scenarioData;
private DownloadRetryOptions options;
private boolean subscribed = false; // Only used for multiple subscription test.

Expand Down Expand Up @@ -89,27 +88,23 @@ DownloadResponseMockFlux setOptions(DownloadRetryOptions options) {
return this;
}

@Override
public void subscribe(CoreSubscriber<? super ByteBuffer> subscriber) {
private Flux<ByteBuffer> getDownloadStream() {
switch (this.scenario) {
case DR_TEST_SCENARIO_SUCCESSFUL_ONE_CHUNK:
subscriber.onNext(this.scenarioData.duplicate());
Operators.complete(subscriber);
break;
return Flux.just(scenarioData.duplicate());

case DR_TEST_SCENARIO_SUCCESSFUL_MULTI_CHUNK:
for (int i = 0; i < 4; i++) {
return Flux.range(0, 4).map(i -> {
ByteBuffer toSend = this.scenarioData.duplicate();
toSend.position(i * 256);
toSend.limit((i + 1) * 256);
subscriber.onNext(toSend);
}
Operators.complete(subscriber);
break;

return toSend;
});

case DR_TEST_SCENARIO_NO_MULTIPLE_SUBSCRIPTION:
if (this.subscribed) {
throw new IllegalStateException("Cannot subscribe to the same flux twice");
return Flux.error(new IllegalStateException("Cannot subscribe to the same flux twice"));
}
this.subscribed = true;
// fall through to test data
Expand All @@ -119,81 +114,71 @@ public void subscribe(CoreSubscriber<? super ByteBuffer> subscriber) {
// tryNumber is 1 indexed, so we have to sub 1.
if (this.info.getOffset() != (this.tryNumber - 1) * 256
|| this.info.getCount() != this.scenarioData.remaining() - (this.tryNumber - 1) * 256) {
Operators.error(subscriber, new IllegalArgumentException("Info values are incorrect."));
return;
return Flux.error(new IllegalArgumentException("Info values are incorrect."));
}

ByteBuffer toSend = this.scenarioData.duplicate();
toSend.position((this.tryNumber - 1) * 256);
toSend.limit(this.tryNumber * 256);
subscriber.onNext(toSend);

Flux<ByteBuffer> dataStream = Flux.just(toSend);

// A slightly odd but sufficient means of exercising the different retriable exceptions.
Exception e = tryNumber % 2 == 0 ? new IOException() : new TimeoutException();
Operators.error(subscriber, e);
break;

return dataStream.concatWith(Flux.error(e));
}
if (this.info.getOffset() != (this.tryNumber - 1) * 256
|| this.info.getCount() != this.scenarioData.remaining() - (this.tryNumber - 1) * 256) {
Operators.error(subscriber, new IllegalArgumentException("Info values are incorrect."));
return;
return Flux.error(new IllegalArgumentException("Info values are incorrect."));
}
ByteBuffer toSend = this.scenarioData.duplicate();
toSend.position((this.tryNumber - 1) * 256);
toSend.limit(this.tryNumber * 256);
subscriber.onNext(toSend);
Operators.complete(subscriber);
break;

return Flux.just(toSend);

case DR_TEST_SCENARIO_ERROR_AFTER_ALL_DATA:
subscriber.onNext(this.scenarioData.duplicate());
Operators.error(subscriber, new IOException("Exception at end"));
break;
return Flux.just(scenarioData.duplicate()).concatWith(Flux.error(new IOException("Exception at end")));

case DR_TEST_SCENARIO_MAX_RETRIES_EXCEEDED:
Operators.error(subscriber, new IOException());
break;
return Flux.error(new IOException());

case DR_TEST_SCENARIO_NON_RETRYABLE_ERROR:
Operators.error(subscriber, new Exception());
break;
return Flux.error(new Exception());

case DR_TEST_SCENARIO_ERROR_GETTER_MIDDLE:
if (this.tryNumber == 1) {
/*
We return a retryable error here so we have to invoke the getter, which will throw an error in
this case.
*/
Operators.error(subscriber, new IOException());
} else {
Operators.error(subscriber, new IllegalArgumentException("Retried after getter error."));
}
break;
/*
* We return a retryable error here so we have to invoke the getter, which will throw an error in
* this case.
*/
return (this.tryNumber == 1)
? Flux.error(new IOException())
: Flux.error(new IllegalArgumentException("Retried after getter error."));

case DR_TEST_SCENARIO_INFO_TEST:
switch (this.tryNumber) {
case 1: // Test the value of info when getting the initial response.
case 2: // Test the value of info when getting an intermediate response.
Operators.error(subscriber, new IOException());
break;
return Flux.error(new IOException());
case 3:
// All calls to getter checked. Exit. This test does not check for data.
Operators.complete(subscriber);
break;
return Flux.empty();
default:
throw new IllegalArgumentException("Invalid try number.");
return Flux.error(new IllegalArgumentException("Invalid try number."));
}
break;

case DR_TEST_SCENARIO_TIMEOUT:
try {
Thread.sleep(61 * 1000L); // We hard code a timeout of 60s
} catch (InterruptedException e) {
e.printStackTrace();
}
Operators.complete(subscriber);
break;

return Flux.empty();

default:
Operators.error(subscriber, new IllegalArgumentException("Invalid test case"));
return Flux.error(new IllegalArgumentException("Invalid test case"));
}
}

Expand All @@ -202,9 +187,9 @@ Mono<ReliableDownload> getter(HttpGetterInfo info) {
this.info = info;
long contentUpperBound = info.getCount() == null
? this.scenarioData.remaining() - 1 : info.getOffset() + info.getCount() - 1;
BlobsDownloadResponse rawResponse = new BlobsDownloadResponse(null, 200, new HttpHeaders(), this,
new BlobDownloadHeaders().setContentRange(String.format("%d-%d/%d",
info.getOffset(), contentUpperBound, this.scenarioData.remaining())));
BlobsDownloadResponse rawResponse = new BlobsDownloadResponse(null, 200, new HttpHeaders(),
this.getDownloadStream(), new BlobDownloadHeaders().setContentRange(String.format("%d-%d/%d",
info.getOffset(), contentUpperBound, this.scenarioData.remaining())));
ReliableDownload response = new ReliableDownload(rawResponse, options, info, this::getter);

switch (this.scenario) {
Expand Down Expand Up @@ -266,7 +251,7 @@ public Mono<String> getBodyAsString(Charset charset) {
// Construct a new flux each time to mimic getting a new download stream.
DownloadResponseMockFlux nextFlux = new DownloadResponseMockFlux(this.scenario, this.tryNumber,
this.scenarioData, this.info, this.options);
rawResponse = new BlobsDownloadResponse(null, 200, new HttpHeaders(), nextFlux,
rawResponse = new BlobsDownloadResponse(null, 200, new HttpHeaders(), nextFlux.getDownloadStream(),
new BlobDownloadHeaders());
response = new ReliableDownload(rawResponse, options, info, this::getter);
return Mono.just(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ class DownloadResponseTest extends APISpec {
response.getValue().subscribeOn(Schedulers.elastic()).then().block(Duration.ofSeconds((retryCount + 1) * 62))

then:
def e = thrown(Exceptions.ReactiveException)
e.getCause() instanceof TimeoutException
def e = thrown(Throwable)
Exceptions.unwrap(e) instanceof TimeoutException

where:
// We test retry count elsewhere. Just using small numbers to speed up the test.
Expand Down

0 comments on commit 5af3d0a

Please sign in to comment.