Skip to content

Commit

Permalink
Merge branch 'master' into disksing/resource-group-wait-config
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing committed Apr 10, 2024
2 parents 1d1b8f6 + 2b8116e commit 2fa25bc
Show file tree
Hide file tree
Showing 14 changed files with 583 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pd-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ jobs:
TOTAL_JOBS: ${{needs.chunks.outputs.job-total}}
run: for i in $(seq 1 $TOTAL_JOBS); do cat covprofile_$i >> covprofile; done
- name: Send coverage
uses: codecov/codecov-action@v1
uses: codecov/codecov-action@v4.2.0
with:
token: ${{ secrets.CODECOV }}
file: ./covprofile
Expand Down
22 changes: 16 additions & 6 deletions client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.

GO_TOOLS_BIN_PATH := $(shell pwd)/../.tools/bin
ROOT_PATH := $(shell pwd)/..
GO_TOOLS_BIN_PATH := $(ROOT_PATH)/.tools/bin
PATH := $(GO_TOOLS_BIN_PATH):$(PATH)
SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash)

default: static tidy test

test:
CGO_ENABLE=1 go test ./... -race -cover
test: failpoint-enable
CGO_ENABLED=1 go test ./... -v -tags deadlock -race -cover || { $(MAKE) failpoint-disable && exit 1; }
$(MAKE) failpoint-disable

basic-test:
CGO_ENABLE=1 go test ./...
basic-test: failpoint-enable
CGO_ENABLED=1 go test ./... || { $(MAKE) failpoint-disable && exit 1; }
$(MAKE) failpoint-disable

ci-test-job:
CGO_ENABLED=1 go test ./... -race -covermode=atomic -coverprofile=covprofile -coverpkg=../... github.com/tikv/pd/client
if [ -f covprofile ]; then rm covprofile; fi
CGO_ENABLED=1 go test ./... -v -tags deadlock -race -cover -covermode=atomic -coverprofile=covprofile -coverpkg=../...

failpoint-enable:
cd $(ROOT_PATH) && $(MAKE) failpoint-enable

failpoint-disable:
cd $(ROOT_PATH) && $(MAKE) failpoint-disable

install-tools:
cd .. && $(MAKE) install-tools
Expand Down
45 changes: 42 additions & 3 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,31 @@ package retry

import (
"context"
"reflect"
"runtime"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"go.uber.org/multierr"
"go.uber.org/zap"
)

const maxRecordErrorCount = 20

// Option is used to customize the backoffer.
type Option func(*Backoffer)

// withMinLogInterval sets the minimum log interval for retrying.
// Because the retry interval may be not the factor of log interval, so this is the minimum interval.
func withMinLogInterval(interval time.Duration) Option {
return func(bo *Backoffer) {
bo.logInterval = interval
}
}

// Backoffer is a backoff policy for retrying operations.
type Backoffer struct {
// base defines the initial time interval to wait before each retry.
Expand All @@ -36,6 +52,10 @@ type Backoffer struct {
// retryableChecker is used to check if the error is retryable.
// By default, all errors are retryable.
retryableChecker func(err error) bool
// logInterval defines the log interval for retrying.
logInterval time.Duration
// nextLogTime is used to record the next log time.
nextLogTime time.Duration

attempt int
next time.Duration
Expand All @@ -50,10 +70,12 @@ func (bo *Backoffer) Exec(
defer bo.resetBackoff()
var (
allErrors error
err error
after *time.Timer
)
fnName := getFunctionName(fn)
for {
err := fn()
err = fn()
bo.attempt++
if bo.attempt < maxRecordErrorCount {
// multierr.Append will ignore nil error.
Expand All @@ -63,6 +85,13 @@ func (bo *Backoffer) Exec(
break
}
currentInterval := bo.nextInterval()
bo.nextLogTime += currentInterval
if err != nil {
if bo.logInterval > 0 && bo.nextLogTime >= bo.logInterval {
bo.nextLogTime %= bo.logInterval
log.Warn("call PD API failed and retrying", zap.String("api", fnName), zap.Int("retry-time", bo.attempt), zap.Error(err))
}
}
if after == nil {
after = time.NewTimer(currentInterval)
} else {
Expand Down Expand Up @@ -93,7 +122,7 @@ func (bo *Backoffer) Exec(
// - `base` defines the initial time interval to wait before each retry.
// - `max` defines the max time interval to wait before each retry.
// - `total` defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success.
func InitialBackoffer(base, max, total time.Duration) *Backoffer {
func InitialBackoffer(base, max, total time.Duration, opts ...Option) *Backoffer {
// Make sure the base is less than or equal to the max.
if base > max {
base = max
Expand All @@ -102,7 +131,7 @@ func InitialBackoffer(base, max, total time.Duration) *Backoffer {
if total > 0 && total < base {
total = base
}
return &Backoffer{
bo := &Backoffer{
base: base,
max: max,
total: total,
Expand All @@ -113,6 +142,10 @@ func InitialBackoffer(base, max, total time.Duration) *Backoffer {
currentTotal: 0,
attempt: 0,
}
for _, opt := range opts {
opt(bo)
}
return bo
}

// SetRetryableChecker sets the retryable checker.
Expand Down Expand Up @@ -152,6 +185,7 @@ func (bo *Backoffer) resetBackoff() {
bo.next = bo.base
bo.currentTotal = 0
bo.attempt = 0
bo.nextLogTime = 0
}

// Only used for test.
Expand All @@ -161,3 +195,8 @@ var testBackOffExecuteFlag = false
func TestBackOffExecute() bool {
return testBackOffExecuteFlag
}

func getFunctionName(f any) string {
strs := strings.Split(runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), ".")
return strings.Split(strs[len(strs)-1], "-")[0]
}
95 changes: 95 additions & 0 deletions client/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
package retry

import (
"bytes"
"context"
"errors"
"testing"
"time"

"github.com/pingcap/log"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestBackoffer(t *testing.T) {
Expand Down Expand Up @@ -107,3 +110,95 @@ func TestBackoffer(t *testing.T) {
func isBackofferReset(bo *Backoffer) bool {
return bo.next == bo.base && bo.currentTotal == 0
}

func TestBackofferWithLog(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

conf := &log.Config{Level: "debug", File: log.FileLogConfig{}, DisableTimestamp: true}
lg := newZapTestLogger(conf)
log.ReplaceGlobals(lg.Logger, nil)

bo := InitialBackoffer(time.Millisecond*10, time.Millisecond*100, time.Millisecond*1000, withMinLogInterval(time.Millisecond*100))
err := bo.Exec(ctx, testFn)
re.ErrorIs(err, errTest)

ms := lg.Messages()
len1 := len(ms)
// 10 + 20 + 40 + 80(log) + 100(log) * 9 >= 1000, so log ten times.
re.Len(ms, 10)
// 10 + 20 + 40 + 80 + 100 * 9, 13 times retry.
rfc := `["call PD API failed and retrying"] [api=testFn] [retry-time=13] [error=test]`
re.Contains(ms[len(ms)-1], rfc)
// 10 + 20 + 40 + 80(log), 4 times retry.
rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]`
re.Contains(ms[0], rfc)

bo.resetBackoff()
err = bo.Exec(ctx, testFn)
re.ErrorIs(err, errTest)

ms = lg.Messages()
re.Len(ms, 20)
rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=13] [error=test]`
re.Contains(ms[len(ms)-1], rfc)
rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]`
re.Contains(ms[len1], rfc)
}

var errTest = errors.New("test")

func testFn() error {
return errTest
}

// testingWriter is a WriteSyncer that writes the the messages.
type testingWriter struct {
messages []string
}

func newTestingWriter() *testingWriter {
return &testingWriter{}
}

func (w *testingWriter) Write(p []byte) (n int, err error) {
n = len(p)
p = bytes.TrimRight(p, "\n")
m := string(p)
w.messages = append(w.messages, m)
return n, nil
}
func (w *testingWriter) Sync() error {
return nil
}

type verifyLogger struct {
*zap.Logger
w *testingWriter
}

func (logger *verifyLogger) Message() string {
if logger.w.messages == nil {
return ""
}
return logger.w.messages[len(logger.w.messages)-1]
}

func (logger *verifyLogger) Messages() []string {
if logger.w.messages == nil {
return nil
}
return logger.w.messages
}

func newZapTestLogger(cfg *log.Config, opts ...zap.Option) verifyLogger {
// TestingWriter is used to write to memory.
// Used in the verify logger.
writer := newTestingWriter()
lg, _, _ := log.InitLoggerWithWriteSyncer(cfg, writer, writer, opts...)
return verifyLogger{
Logger: lg,
w: writer,
}
}
Loading

0 comments on commit 2fa25bc

Please sign in to comment.