From d6ef1c722a9c3bd4de6f1aaebbea91161f83df69 Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Wed, 24 Jan 2024 18:52:51 +0800 Subject: [PATCH] br: add more retry strategy (s3.ReadFile: body reader / pushBackup: backoffer) (#50541) close pingcap/tidb#49942 --- br/pkg/backup/client.go | 61 ++++++++---------------------------- br/pkg/storage/s3.go | 53 ++++++++++++++++++++++--------- br/pkg/storage/s3_test.go | 38 ++++++++++++++++++++++ br/pkg/utils/BUILD.bazel | 2 +- br/pkg/utils/backoff.go | 33 ++++++++++++++++++- br/pkg/utils/backoff_test.go | 19 +++++++++++ br/pkg/utils/retry.go | 1 + br/tests/br_full/run.sh | 2 +- 8 files changed, 143 insertions(+), 66 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 9e6af11b9d326..fff4ae0f84590 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -1437,57 +1437,22 @@ func SendBackup( var errReset error var errBackup error - for retry := 0; retry < backupRetryTimes; retry++ { - logutil.CL(ctx).Info("try backup", - zap.Int("retry time", retry), - ) + retry := -1 + return utils.WithRetry(ctx, func() error { + retry += 1 + if retry != 0 { + client, errReset = resetFn() + if errReset != nil { + return errors.Annotatef(errReset, "failed to reset backup connection on store:%d "+ + "please check the tikv status", storeID) + } + } + logutil.CL(ctx).Info("try backup", zap.Int("retry time", retry)) errBackup = doSendBackup(ctx, client, req, respFn) if errBackup != nil { - if isRetryableError(errBackup) { - time.Sleep(3 * time.Second) - client, errReset = resetFn() - if errReset != nil { - return errors.Annotatef(errReset, "failed to reset backup connection on store:%d "+ - "please check the tikv status", storeID) - } - continue - } - logutil.CL(ctx).Error("fail to backup", zap.Uint64("StoreID", storeID), zap.Int("retry", retry)) return berrors.ErrFailedToConnect.Wrap(errBackup).GenWithStack("failed to create backup stream to store %d", storeID) } - // finish backup - break - } - return nil -} - -// gRPC communication cancelled with connection closing -const ( - gRPC_Cancel = "the client connection is closing" -) - -// isRetryableError represents whether we should retry reset grpc connection. -func isRetryableError(err error) bool { - // some errors can be retried - // https://github.com/pingcap/tidb/issues/34350 - switch status.Code(err) { - case codes.Unavailable, codes.DeadlineExceeded, - codes.ResourceExhausted, codes.Aborted, codes.Internal: - { - log.Warn("backup met some errors, these errors can be retry 5 times", zap.Error(err)) - return true - } - } - // At least, there are two possible cancel() call, - // one from backup range, another from gRPC, here we retry when gRPC cancel with connection closing - if status.Code(err) == codes.Canceled { - if s, ok := status.FromError(err); ok { - if strings.Contains(s.Message(), gRPC_Cancel) { - log.Warn("backup met grpc cancel error, this errors can be retry 5 times", zap.Error(err)) - return true - } - } - } - return false + return nil + }, utils.NewBackupSSTBackoffer()) } diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index 22c3250de9bf0..0abb60838fbf4 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -559,22 +559,41 @@ func (rs *S3Storage) WriteFile(ctx context.Context, file string, data []byte) er // ReadFile reads the file from the storage and returns the contents. func (rs *S3Storage) ReadFile(ctx context.Context, file string) ([]byte, error) { - input := &s3.GetObjectInput{ - Bucket: aws.String(rs.options.Bucket), - Key: aws.String(rs.options.Prefix + file), - } - result, err := rs.svc.GetObjectWithContext(ctx, input) - if err != nil { - return nil, errors.Annotatef(err, - "failed to read s3 file, file info: input.bucket='%s', input.key='%s'", - *input.Bucket, *input.Key) - } - defer result.Body.Close() - data, err := io.ReadAll(result.Body) - if err != nil { - return nil, errors.Trace(err) + var ( + data []byte + readErr error + ) + for retryCnt := 0; retryCnt < maxErrorRetries; retryCnt += 1 { + input := &s3.GetObjectInput{ + Bucket: aws.String(rs.options.Bucket), + Key: aws.String(rs.options.Prefix + file), + } + result, err := rs.svc.GetObjectWithContext(ctx, input) + if err != nil { + return nil, errors.Annotatef(err, + "failed to read s3 file, file info: input.bucket='%s', input.key='%s'", + *input.Bucket, *input.Key) + } + data, readErr = io.ReadAll(result.Body) + // close the body of response since data has been already read out + result.Body.Close() + // for unit test + failpoint.Inject("read-s3-body-failed", func(_ failpoint.Value) { + log.Info("original error", zap.Error(readErr)) + readErr = errors.Errorf("read: connection reset by peer") + }) + if readErr != nil { + if isDeadlineExceedError(readErr) || isCancelError(readErr) { + return nil, errors.Annotatef(readErr, "failed to read body from get object result, file info: input.bucket='%s', input.key='%s', retryCnt='%d'", + *input.Bucket, *input.Key, retryCnt) + } + continue + } + return data, nil } - return data, nil + // retry too much, should be failed + return nil, errors.Annotatef(readErr, "failed to read body from get object result (retry too much), file info: input.bucket='%s', input.key='%s'", + rs.options.Bucket, rs.options.Prefix+file) } // DeleteFile delete the file in s3 storage @@ -1104,6 +1123,10 @@ type retryerWithLog struct { client.DefaultRetryer } +func isCancelError(err error) bool { + return strings.Contains(err.Error(), "context canceled") +} + func isDeadlineExceedError(err error) bool { // TODO find a better way. // Known challenges: diff --git a/br/pkg/storage/s3_test.go b/br/pkg/storage/s3_test.go index 829e2049bdbcd..1fa5ae3a32a82 100644 --- a/br/pkg/storage/s3_test.go +++ b/br/pkg/storage/s3_test.go @@ -12,6 +12,7 @@ import ( "net/http" "net/http/httptest" "os" + "strings" "sync" "testing" @@ -1383,3 +1384,40 @@ func TestRetryError(t *testing.T) { require.NoError(t, err) require.Equal(t, count, int32(2)) } + +func TestS3ReadFileRetryable(t *testing.T) { + s := createS3Suite(t) + ctx := aws.BackgroundContext() + errMsg := "just some unrelated error" + expectedErr := errors.New(errMsg) + + s.s3.EXPECT(). + GetObjectWithContext(ctx, gomock.Any()). + DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) { + require.Equal(t, "bucket", aws.StringValue(input.Bucket)) + require.Equal(t, "prefix/file", aws.StringValue(input.Key)) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("test"))), + }, nil + }) + s.s3.EXPECT(). + GetObjectWithContext(ctx, gomock.Any()). + DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) { + require.Equal(t, "bucket", aws.StringValue(input.Bucket)) + require.Equal(t, "prefix/file", aws.StringValue(input.Key)) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("test"))), + }, nil + }) + s.s3.EXPECT(). + GetObjectWithContext(ctx, gomock.Any()). + Return(nil, expectedErr) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/read-s3-body-failed", "2*return(true)")) + defer func() { + failpoint.Disable("github.com/pingcap/tidb/br/pkg/storage/read-s3-body-failed") + }() + _, err := s.storage.ReadFile(ctx, "file") + require.Error(t, err) + require.True(t, strings.Contains(err.Error(), errMsg)) +} diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index 6dc2f2a7420f8..b9db66c4e18cd 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -87,7 +87,7 @@ go_test( ], embed = [":utils"], flaky = True, - shard_count = 33, + shard_count = 34, deps = [ "//br/pkg/errors", "//pkg/kv", diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index 7f2f04cca9db5..6b7aa7a127863 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -6,6 +6,7 @@ import ( "context" "database/sql" "io" + "strings" "time" "github.com/pingcap/errors" @@ -26,6 +27,10 @@ const ( downloadSSTWaitInterval = 1 * time.Second downloadSSTMaxWaitInterval = 4 * time.Second + backupSSTRetryTimes = 5 + backupSSTWaitInterval = 2 * time.Second + backupSSTMaxWaitInterval = 3 * time.Second + resetTSRetryTime = 16 resetTSWaitInterval = 50 * time.Millisecond resetTSMaxWaitInterval = 500 * time.Millisecond @@ -42,8 +47,21 @@ const ( ChecksumRetryTime = 8 ChecksumWaitInterval = 1 * time.Second ChecksumMaxWaitInterval = 30 * time.Second + + gRPC_Cancel = "the client connection is closing" ) +// At least, there are two possible cancel() call, +// one from go context, another from gRPC, here we retry when gRPC cancel with connection closing +func isGRPCCancel(err error) bool { + if s, ok := status.FromError(err); ok { + if strings.Contains(s.Message(), gRPC_Cancel) { + return true + } + } + return false +} + // RetryState is the mutable state needed for retrying. // It likes the `utils.Backoffer`, but more fundamental: // this only control the backoff time and knows nothing about what error happens. @@ -143,6 +161,11 @@ func NewDownloadSSTBackoffer() Backoffer { return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval, errContext) } +func NewBackupSSTBackoffer() Backoffer { + errContext := NewErrorContext("backup sst", 3) + return NewBackoffer(backupSSTRetryTimes, backupSSTWaitInterval, backupSSTMaxWaitInterval, errContext) +} + func (bo *importerBackoffer) NextBackoff(err error) time.Duration { log.Warn("retry to import ssts", zap.Int("attempt", bo.attempt), zap.Error(err)) // we don't care storeID here. @@ -162,9 +185,17 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration { bo.attempt = 0 default: switch status.Code(e) { - case codes.Unavailable, codes.Aborted, codes.DeadlineExceeded: + case codes.Unavailable, codes.Aborted, codes.DeadlineExceeded, codes.ResourceExhausted, codes.Internal: bo.delayTime = 2 * bo.delayTime bo.attempt-- + case codes.Canceled: + if isGRPCCancel(err) { + bo.delayTime = 2 * bo.delayTime + bo.attempt-- + } else { + bo.delayTime = 0 + bo.attempt = 0 + } default: // Unexpected error bo.delayTime = 0 diff --git a/br/pkg/utils/backoff_test.go b/br/pkg/utils/backoff_test.go index 857010bfc871a..316896bde3f0d 100644 --- a/br/pkg/utils/backoff_test.go +++ b/br/pkg/utils/backoff_test.go @@ -178,3 +178,22 @@ func TestNewDownloadSSTBackofferWithCancel(t *testing.T) { context.Canceled, }, multierr.Errors(err)) } + +func TestNewBackupSSTBackofferWithCancel(t *testing.T) { + var counter int + backoffer := utils.NewBackupSSTBackoffer() + err := utils.WithRetry(context.Background(), func() error { + defer func() { counter++ }() + if counter == 3 { + return context.Canceled + } + return berrors.ErrKVIngestFailed + }, backoffer) + require.Equal(t, 4, counter) + require.Equal(t, []error{ + berrors.ErrKVIngestFailed, + berrors.ErrKVIngestFailed, + berrors.ErrKVIngestFailed, + context.Canceled, + }, multierr.Errors(err)) +} diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 559a11d978274..d754070ff80d1 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -31,6 +31,7 @@ var retryableServerError = []string{ "body write aborted", "error during dispatch", "put object timeout", + "timeout after", "internalerror", "not read from or written to within the timeout period", "requesttimeout", diff --git a/br/tests/br_full/run.sh b/br/tests/br_full/run.sh index dcfb236a85fe0..3752ce40995b9 100755 --- a/br/tests/br_full/run.sh +++ b/br/tests/br_full/run.sh @@ -53,7 +53,7 @@ test_log="${TEST_DIR}/${DB}_test.log" error_str="not read from or written to within the timeout period" unset BR_LOG_TO_TERM -export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"RequestTimeout\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"InvalidPart\")->1*return(\"end of file before message length reached\")" +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"RequestTimeout\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"InvalidPart\")->1*return(\"end of file before message length reached\")->1*return(\"timeout after\")" run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --concurrency 4 --compression lz4 --log-file $test_log export GO_FAILPOINTS="" size_lz4=$(du -d 0 $TEST_DIR/$DB-lz4 | awk '{print $1}')