Skip to content

Commit

Permalink
br: add more retry strategy (s3.ReadFile: body reader / pushBackup: b…
Browse files Browse the repository at this point in the history
…ackoffer) (#50541)

close #49942
  • Loading branch information
Leavrth committed Jan 24, 2024
1 parent c1cae24 commit d6ef1c7
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 66 deletions.
61 changes: 13 additions & 48 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1437,57 +1437,22 @@ func SendBackup(
var errReset error
var errBackup error

for retry := 0; retry < backupRetryTimes; retry++ {
logutil.CL(ctx).Info("try backup",
zap.Int("retry time", retry),
)
retry := -1
return utils.WithRetry(ctx, func() error {
retry += 1
if retry != 0 {
client, errReset = resetFn()
if errReset != nil {
return errors.Annotatef(errReset, "failed to reset backup connection on store:%d "+
"please check the tikv status", storeID)
}
}
logutil.CL(ctx).Info("try backup", zap.Int("retry time", retry))
errBackup = doSendBackup(ctx, client, req, respFn)
if errBackup != nil {
if isRetryableError(errBackup) {
time.Sleep(3 * time.Second)
client, errReset = resetFn()
if errReset != nil {
return errors.Annotatef(errReset, "failed to reset backup connection on store:%d "+
"please check the tikv status", storeID)
}
continue
}
logutil.CL(ctx).Error("fail to backup", zap.Uint64("StoreID", storeID), zap.Int("retry", retry))
return berrors.ErrFailedToConnect.Wrap(errBackup).GenWithStack("failed to create backup stream to store %d", storeID)
}
// finish backup
break
}
return nil
}

// gRPC communication cancelled with connection closing
const (
gRPC_Cancel = "the client connection is closing"
)

// isRetryableError represents whether we should retry reset grpc connection.
func isRetryableError(err error) bool {
// some errors can be retried
// https://github.com/pingcap/tidb/issues/34350
switch status.Code(err) {
case codes.Unavailable, codes.DeadlineExceeded,
codes.ResourceExhausted, codes.Aborted, codes.Internal:
{
log.Warn("backup met some errors, these errors can be retry 5 times", zap.Error(err))
return true
}
}

// At least, there are two possible cancel() call,
// one from backup range, another from gRPC, here we retry when gRPC cancel with connection closing
if status.Code(err) == codes.Canceled {
if s, ok := status.FromError(err); ok {
if strings.Contains(s.Message(), gRPC_Cancel) {
log.Warn("backup met grpc cancel error, this errors can be retry 5 times", zap.Error(err))
return true
}
}
}
return false
return nil
}, utils.NewBackupSSTBackoffer())
}
53 changes: 38 additions & 15 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,22 +559,41 @@ func (rs *S3Storage) WriteFile(ctx context.Context, file string, data []byte) er

// ReadFile reads the file from the storage and returns the contents.
func (rs *S3Storage) ReadFile(ctx context.Context, file string) ([]byte, error) {
input := &s3.GetObjectInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + file),
}
result, err := rs.svc.GetObjectWithContext(ctx, input)
if err != nil {
return nil, errors.Annotatef(err,
"failed to read s3 file, file info: input.bucket='%s', input.key='%s'",
*input.Bucket, *input.Key)
}
defer result.Body.Close()
data, err := io.ReadAll(result.Body)
if err != nil {
return nil, errors.Trace(err)
var (
data []byte
readErr error
)
for retryCnt := 0; retryCnt < maxErrorRetries; retryCnt += 1 {
input := &s3.GetObjectInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + file),
}
result, err := rs.svc.GetObjectWithContext(ctx, input)
if err != nil {
return nil, errors.Annotatef(err,
"failed to read s3 file, file info: input.bucket='%s', input.key='%s'",
*input.Bucket, *input.Key)
}
data, readErr = io.ReadAll(result.Body)
// close the body of response since data has been already read out
result.Body.Close()
// for unit test
failpoint.Inject("read-s3-body-failed", func(_ failpoint.Value) {
log.Info("original error", zap.Error(readErr))
readErr = errors.Errorf("read: connection reset by peer")
})
if readErr != nil {
if isDeadlineExceedError(readErr) || isCancelError(readErr) {
return nil, errors.Annotatef(readErr, "failed to read body from get object result, file info: input.bucket='%s', input.key='%s', retryCnt='%d'",
*input.Bucket, *input.Key, retryCnt)
}
continue
}
return data, nil
}
return data, nil
// retry too much, should be failed
return nil, errors.Annotatef(readErr, "failed to read body from get object result (retry too much), file info: input.bucket='%s', input.key='%s'",
rs.options.Bucket, rs.options.Prefix+file)
}

// DeleteFile delete the file in s3 storage
Expand Down Expand Up @@ -1104,6 +1123,10 @@ type retryerWithLog struct {
client.DefaultRetryer
}

func isCancelError(err error) bool {
return strings.Contains(err.Error(), "context canceled")
}

func isDeadlineExceedError(err error) bool {
// TODO find a better way.
// Known challenges:
Expand Down
38 changes: 38 additions & 0 deletions br/pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"net/http/httptest"
"os"
"strings"
"sync"
"testing"

Expand Down Expand Up @@ -1383,3 +1384,40 @@ func TestRetryError(t *testing.T) {
require.NoError(t, err)
require.Equal(t, count, int32(2))
}

func TestS3ReadFileRetryable(t *testing.T) {
s := createS3Suite(t)
ctx := aws.BackgroundContext()
errMsg := "just some unrelated error"
expectedErr := errors.New(errMsg)

s.s3.EXPECT().
GetObjectWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) {
require.Equal(t, "bucket", aws.StringValue(input.Bucket))
require.Equal(t, "prefix/file", aws.StringValue(input.Key))
return &s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte("test"))),
}, nil
})
s.s3.EXPECT().
GetObjectWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) {
require.Equal(t, "bucket", aws.StringValue(input.Bucket))
require.Equal(t, "prefix/file", aws.StringValue(input.Key))
return &s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte("test"))),
}, nil
})
s.s3.EXPECT().
GetObjectWithContext(ctx, gomock.Any()).
Return(nil, expectedErr)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/read-s3-body-failed", "2*return(true)"))
defer func() {
failpoint.Disable("github.com/pingcap/tidb/br/pkg/storage/read-s3-body-failed")
}()
_, err := s.storage.ReadFile(ctx, "file")
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), errMsg))
}
2 changes: 1 addition & 1 deletion br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ go_test(
],
embed = [":utils"],
flaky = True,
shard_count = 33,
shard_count = 34,
deps = [
"//br/pkg/errors",
"//pkg/kv",
Expand Down
33 changes: 32 additions & 1 deletion br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"database/sql"
"io"
"strings"
"time"

"github.com/pingcap/errors"
Expand All @@ -26,6 +27,10 @@ const (
downloadSSTWaitInterval = 1 * time.Second
downloadSSTMaxWaitInterval = 4 * time.Second

backupSSTRetryTimes = 5
backupSSTWaitInterval = 2 * time.Second
backupSSTMaxWaitInterval = 3 * time.Second

resetTSRetryTime = 16
resetTSWaitInterval = 50 * time.Millisecond
resetTSMaxWaitInterval = 500 * time.Millisecond
Expand All @@ -42,8 +47,21 @@ const (
ChecksumRetryTime = 8
ChecksumWaitInterval = 1 * time.Second
ChecksumMaxWaitInterval = 30 * time.Second

gRPC_Cancel = "the client connection is closing"
)

// At least, there are two possible cancel() call,
// one from go context, another from gRPC, here we retry when gRPC cancel with connection closing
func isGRPCCancel(err error) bool {
if s, ok := status.FromError(err); ok {
if strings.Contains(s.Message(), gRPC_Cancel) {
return true
}
}
return false
}

// RetryState is the mutable state needed for retrying.
// It likes the `utils.Backoffer`, but more fundamental:
// this only control the backoff time and knows nothing about what error happens.
Expand Down Expand Up @@ -143,6 +161,11 @@ func NewDownloadSSTBackoffer() Backoffer {
return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval, errContext)
}

func NewBackupSSTBackoffer() Backoffer {
errContext := NewErrorContext("backup sst", 3)
return NewBackoffer(backupSSTRetryTimes, backupSSTWaitInterval, backupSSTMaxWaitInterval, errContext)
}

func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
log.Warn("retry to import ssts", zap.Int("attempt", bo.attempt), zap.Error(err))
// we don't care storeID here.
Expand All @@ -162,9 +185,17 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
bo.attempt = 0
default:
switch status.Code(e) {
case codes.Unavailable, codes.Aborted, codes.DeadlineExceeded:
case codes.Unavailable, codes.Aborted, codes.DeadlineExceeded, codes.ResourceExhausted, codes.Internal:
bo.delayTime = 2 * bo.delayTime
bo.attempt--
case codes.Canceled:
if isGRPCCancel(err) {
bo.delayTime = 2 * bo.delayTime
bo.attempt--
} else {
bo.delayTime = 0
bo.attempt = 0
}
default:
// Unexpected error
bo.delayTime = 0
Expand Down
19 changes: 19 additions & 0 deletions br/pkg/utils/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,22 @@ func TestNewDownloadSSTBackofferWithCancel(t *testing.T) {
context.Canceled,
}, multierr.Errors(err))
}

func TestNewBackupSSTBackofferWithCancel(t *testing.T) {
var counter int
backoffer := utils.NewBackupSSTBackoffer()
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
if counter == 3 {
return context.Canceled
}
return berrors.ErrKVIngestFailed
}, backoffer)
require.Equal(t, 4, counter)
require.Equal(t, []error{
berrors.ErrKVIngestFailed,
berrors.ErrKVIngestFailed,
berrors.ErrKVIngestFailed,
context.Canceled,
}, multierr.Errors(err))
}
1 change: 1 addition & 0 deletions br/pkg/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var retryableServerError = []string{
"body write aborted",
"error during dispatch",
"put object timeout",
"timeout after",
"internalerror",
"not read from or written to within the timeout period",
"<code>requesttimeout</code>",
Expand Down
2 changes: 1 addition & 1 deletion br/tests/br_full/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ test_log="${TEST_DIR}/${DB}_test.log"
error_str="not read from or written to within the timeout period"
unset BR_LOG_TO_TERM

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"<Code>RequestTimeout</Code>\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"<Code>InvalidPart</Code>\")->1*return(\"end of file before message length reached\")"
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"<Code>RequestTimeout</Code>\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"<Code>InvalidPart</Code>\")->1*return(\"end of file before message length reached\")->1*return(\"timeout after\")"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --concurrency 4 --compression lz4 --log-file $test_log
export GO_FAILPOINTS=""
size_lz4=$(du -d 0 $TEST_DIR/$DB-lz4 | awk '{print $1}')
Expand Down

0 comments on commit d6ef1c7

Please sign in to comment.