Skip to content

Commit

Permalink
fix: reduce duplicate code in streaming retries and add a test (#1636)
Browse files Browse the repository at this point in the history
* fix: reduce duplicate code in streaming retries and add a test

* add comments
  • Loading branch information
leahecole committed Aug 2, 2024
1 parent b5d984a commit 83b52e1
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 14 deletions.
26 changes: 12 additions & 14 deletions gax/src/streamingCalls/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
this.gaxServerStreamingRetries = gaxServerStreamingRetries;
}

private shouldRetryRequest(error: Error, retry: RetryOptions): boolean {
const e = GoogleError.parseGRPCStatusDetails(error);
let shouldRetry = this.defaultShouldRetry(e!, retry);
if (retry.shouldRetryFn) {
shouldRetry = retry.shouldRetryFn(e!);
}
return shouldRetry;
}

cancel() {
if (this.stream) {
this.stream.cancel();
Expand Down Expand Up @@ -228,13 +237,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
}

this.retries!++;
const e = GoogleError.parseGRPCStatusDetails(error);
let shouldRetry = this.defaultShouldRetry(e!, retry);
if (retry.shouldRetryFn) {
shouldRetry = retry.shouldRetryFn(e!);
}

if (shouldRetry) {
if (this.shouldRetryRequest(error, retry)) {
const toSleep = Math.random() * delay;
setTimeout(() => {
now = new Date();
Expand All @@ -246,6 +249,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
timeout = Math.min(timeoutCal, rpcTimeout, newDeadline);
}, toSleep);
} else {
const e = GoogleError.parseGRPCStatusDetails(error);
e.note =
'Exception occurred in retry method that was ' +
'not classified as transient';
Expand Down Expand Up @@ -377,13 +381,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
const timeout = retry.backoffSettings.totalTimeoutMillis;
const maxRetries = retry.backoffSettings.maxRetries!;
if ((maxRetries && maxRetries > 0) || (timeout && timeout > 0)) {
const e = GoogleError.parseGRPCStatusDetails(error);
let shouldRetry = this.defaultShouldRetry(e!, retry);
if (retry.shouldRetryFn) {
shouldRetry = retry.shouldRetryFn(e!);
}

if (shouldRetry) {
if (this.shouldRetryRequest(error, retry)) {
if (maxRetries && timeout!) {
const newError = new GoogleError(
'Cannot set both totalTimeoutMillis and maxRetries ' +
Expand Down
66 changes: 66 additions & 0 deletions gax/test/test-application/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ async function testShowcase() {
);

await testErrorMaxRetries0(grpcSequenceClientWithServerStreamingRetries);
await testServerStreamingRetriesImmediatelywithRetryOptions(
grpcSequenceClientWithServerStreamingRetries
);

// ensure legacy tests pass with streaming retries client
await testEcho(grpcClientWithServerStreamingRetries);
await testEchoError(grpcClientWithServerStreamingRetries);
Expand Down Expand Up @@ -1219,6 +1223,68 @@ async function testErrorMaxRetries0(client: SequenceServiceClient) {
});
});
}
// a streaming call that retries two times and finishes successfully
async function testServerStreamingRetriesImmediatelywithRetryOptions(
client: SequenceServiceClient
) {
const finalData: string[] = [];
const backoffSettings = createBackoffSettings(
100,
1.2,
1000,
null,
1.5,
3000,
10000
);

// allow the two codes we are going to send as errors
const retryOptions = new RetryOptions([14, 4], backoffSettings);

const settings = {
retry: retryOptions,
};

client.initialize();

// errors immediately, then again after sending "This is"
const request = createStreamingSequenceRequestFactory(
[Status.UNAVAILABLE, Status.DEADLINE_EXCEEDED, Status.OK],
[0.1, 0.1, 0.1],
[0, 2, 11],
'This is testing the brand new and shiny StreamingSequence server 3'
);

const response = await client.createStreamingSequence(request);
await new Promise<void>((resolve, reject) => {
const sequence = response[0];

const attemptRequest =
new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest();
attemptRequest.name = sequence.name!;

const attemptStream = client.attemptStreamingSequence(
attemptRequest,
settings
);
attemptStream.on('data', (response: {content: string}) => {
finalData.push(response.content);
});
attemptStream.on('error', error => {
reject(error);
});
attemptStream.on('end', () => {
attemptStream.end();

resolve();
});
}).then(() => {
assert.equal(
finalData.join(' '),
'This is This is testing the brand new and shiny StreamingSequence server 3'
);
});
}

async function main() {
const showcaseServer = new ShowcaseServer();
Expand Down

0 comments on commit 83b52e1

Please sign in to comment.