From 83b52e14ba3bb1e9f0e079ea53e47ec32b4cfd45 Mon Sep 17 00:00:00 2001 From: "Leah E. Cole" <6719667+leahecole@users.noreply.github.com> Date: Fri, 2 Aug 2024 13:23:41 -0400 Subject: [PATCH] fix: reduce duplicate code in streaming retries and add a test (#1636) * fix: reduce duplicate code in streaming retries and add a test * add comments --- gax/src/streamingCalls/streaming.ts | 26 +++++----- gax/test/test-application/src/index.ts | 66 ++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 14 deletions(-) diff --git a/gax/src/streamingCalls/streaming.ts b/gax/src/streamingCalls/streaming.ts index c74facaa3..583425fc9 100644 --- a/gax/src/streamingCalls/streaming.ts +++ b/gax/src/streamingCalls/streaming.ts @@ -119,6 +119,15 @@ export class StreamProxy extends duplexify implements GRPCCallResult { this.gaxServerStreamingRetries = gaxServerStreamingRetries; } + private shouldRetryRequest(error: Error, retry: RetryOptions): boolean { + const e = GoogleError.parseGRPCStatusDetails(error); + let shouldRetry = this.defaultShouldRetry(e!, retry); + if (retry.shouldRetryFn) { + shouldRetry = retry.shouldRetryFn(e!); + } + return shouldRetry; + } + cancel() { if (this.stream) { this.stream.cancel(); @@ -228,13 +237,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult { } this.retries!++; - const e = GoogleError.parseGRPCStatusDetails(error); - let shouldRetry = this.defaultShouldRetry(e!, retry); - if (retry.shouldRetryFn) { - shouldRetry = retry.shouldRetryFn(e!); - } - - if (shouldRetry) { + if (this.shouldRetryRequest(error, retry)) { const toSleep = Math.random() * delay; setTimeout(() => { now = new Date(); @@ -246,6 +249,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult { timeout = Math.min(timeoutCal, rpcTimeout, newDeadline); }, toSleep); } else { + const e = GoogleError.parseGRPCStatusDetails(error); e.note = 'Exception occurred in retry method that was ' + 'not classified as transient'; @@ -377,13 +381,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult { const timeout = retry.backoffSettings.totalTimeoutMillis; const maxRetries = retry.backoffSettings.maxRetries!; if ((maxRetries && maxRetries > 0) || (timeout && timeout > 0)) { - const e = GoogleError.parseGRPCStatusDetails(error); - let shouldRetry = this.defaultShouldRetry(e!, retry); - if (retry.shouldRetryFn) { - shouldRetry = retry.shouldRetryFn(e!); - } - - if (shouldRetry) { + if (this.shouldRetryRequest(error, retry)) { if (maxRetries && timeout!) { const newError = new GoogleError( 'Cannot set both totalTimeoutMillis and maxRetries ' + diff --git a/gax/test/test-application/src/index.ts b/gax/test/test-application/src/index.ts index c83a733f3..f4500170f 100644 --- a/gax/test/test-application/src/index.ts +++ b/gax/test/test-application/src/index.ts @@ -151,6 +151,10 @@ async function testShowcase() { ); await testErrorMaxRetries0(grpcSequenceClientWithServerStreamingRetries); + await testServerStreamingRetriesImmediatelywithRetryOptions( + grpcSequenceClientWithServerStreamingRetries + ); + // ensure legacy tests pass with streaming retries client await testEcho(grpcClientWithServerStreamingRetries); await testEchoError(grpcClientWithServerStreamingRetries); @@ -1219,6 +1223,68 @@ async function testErrorMaxRetries0(client: SequenceServiceClient) { }); }); } +// a streaming call that retries two times and finishes successfully +async function testServerStreamingRetriesImmediatelywithRetryOptions( + client: SequenceServiceClient +) { + const finalData: string[] = []; + const backoffSettings = createBackoffSettings( + 100, + 1.2, + 1000, + null, + 1.5, + 3000, + 10000 + ); + + // allow the two codes we are going to send as errors + const retryOptions = new RetryOptions([14, 4], backoffSettings); + + const settings = { + retry: retryOptions, + }; + + client.initialize(); + + // errors immediately, then again after sending "This is" + const request = createStreamingSequenceRequestFactory( + [Status.UNAVAILABLE, Status.DEADLINE_EXCEEDED, Status.OK], + [0.1, 0.1, 0.1], + [0, 2, 11], + 'This is testing the brand new and shiny StreamingSequence server 3' + ); + + const response = await client.createStreamingSequence(request); + await new Promise((resolve, reject) => { + const sequence = response[0]; + + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + attemptStream.on('data', (response: {content: string}) => { + finalData.push(response.content); + }); + attemptStream.on('error', error => { + reject(error); + }); + attemptStream.on('end', () => { + attemptStream.end(); + + resolve(); + }); + }).then(() => { + assert.equal( + finalData.join(' '), + 'This is This is testing the brand new and shiny StreamingSequence server 3' + ); + }); +} async function main() { const showcaseServer = new ShowcaseServer();