From 80a2fd907c9c78fe2d3b4dc77ebf6332a5e6d1df Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Tue, 9 Apr 2024 16:19:51 +0800 Subject: [PATCH 1/5] retry, client: support to log after execing some times (#7895) ref tikv/pd#7894 Signed-off-by: Cabinfever_B Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/retry/backoff.go | 45 +++++++++++++++-- client/retry/backoff_test.go | 95 ++++++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+), 3 deletions(-) diff --git a/client/retry/backoff.go b/client/retry/backoff.go index 6c293098971..580e466badb 100644 --- a/client/retry/backoff.go +++ b/client/retry/backoff.go @@ -16,15 +16,31 @@ package retry import ( "context" + "reflect" + "runtime" + "strings" "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/log" "go.uber.org/multierr" + "go.uber.org/zap" ) const maxRecordErrorCount = 20 +// Option is used to customize the backoffer. +type Option func(*Backoffer) + +// withMinLogInterval sets the minimum log interval for retrying. +// Because the retry interval may be not the factor of log interval, so this is the minimum interval. +func withMinLogInterval(interval time.Duration) Option { + return func(bo *Backoffer) { + bo.logInterval = interval + } +} + // Backoffer is a backoff policy for retrying operations. type Backoffer struct { // base defines the initial time interval to wait before each retry. @@ -36,6 +52,10 @@ type Backoffer struct { // retryableChecker is used to check if the error is retryable. // By default, all errors are retryable. retryableChecker func(err error) bool + // logInterval defines the log interval for retrying. + logInterval time.Duration + // nextLogTime is used to record the next log time. + nextLogTime time.Duration attempt int next time.Duration @@ -50,10 +70,12 @@ func (bo *Backoffer) Exec( defer bo.resetBackoff() var ( allErrors error + err error after *time.Timer ) + fnName := getFunctionName(fn) for { - err := fn() + err = fn() bo.attempt++ if bo.attempt < maxRecordErrorCount { // multierr.Append will ignore nil error. @@ -63,6 +85,13 @@ func (bo *Backoffer) Exec( break } currentInterval := bo.nextInterval() + bo.nextLogTime += currentInterval + if err != nil { + if bo.logInterval > 0 && bo.nextLogTime >= bo.logInterval { + bo.nextLogTime %= bo.logInterval + log.Warn("call PD API failed and retrying", zap.String("api", fnName), zap.Int("retry-time", bo.attempt), zap.Error(err)) + } + } if after == nil { after = time.NewTimer(currentInterval) } else { @@ -93,7 +122,7 @@ func (bo *Backoffer) Exec( // - `base` defines the initial time interval to wait before each retry. // - `max` defines the max time interval to wait before each retry. // - `total` defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success. -func InitialBackoffer(base, max, total time.Duration) *Backoffer { +func InitialBackoffer(base, max, total time.Duration, opts ...Option) *Backoffer { // Make sure the base is less than or equal to the max. if base > max { base = max @@ -102,7 +131,7 @@ func InitialBackoffer(base, max, total time.Duration) *Backoffer { if total > 0 && total < base { total = base } - return &Backoffer{ + bo := &Backoffer{ base: base, max: max, total: total, @@ -113,6 +142,10 @@ func InitialBackoffer(base, max, total time.Duration) *Backoffer { currentTotal: 0, attempt: 0, } + for _, opt := range opts { + opt(bo) + } + return bo } // SetRetryableChecker sets the retryable checker. @@ -152,6 +185,7 @@ func (bo *Backoffer) resetBackoff() { bo.next = bo.base bo.currentTotal = 0 bo.attempt = 0 + bo.nextLogTime = 0 } // Only used for test. @@ -161,3 +195,8 @@ var testBackOffExecuteFlag = false func TestBackOffExecute() bool { return testBackOffExecuteFlag } + +func getFunctionName(f any) string { + strs := strings.Split(runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), ".") + return strings.Split(strs[len(strs)-1], "-")[0] +} diff --git a/client/retry/backoff_test.go b/client/retry/backoff_test.go index 32a42d083bd..c877860b5ae 100644 --- a/client/retry/backoff_test.go +++ b/client/retry/backoff_test.go @@ -15,12 +15,15 @@ package retry import ( + "bytes" "context" "errors" "testing" "time" + "github.com/pingcap/log" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) func TestBackoffer(t *testing.T) { @@ -107,3 +110,95 @@ func TestBackoffer(t *testing.T) { func isBackofferReset(bo *Backoffer) bool { return bo.next == bo.base && bo.currentTotal == 0 } + +func TestBackofferWithLog(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + conf := &log.Config{Level: "debug", File: log.FileLogConfig{}, DisableTimestamp: true} + lg := newZapTestLogger(conf) + log.ReplaceGlobals(lg.Logger, nil) + + bo := InitialBackoffer(time.Millisecond*10, time.Millisecond*100, time.Millisecond*1000, withMinLogInterval(time.Millisecond*100)) + err := bo.Exec(ctx, testFn) + re.ErrorIs(err, errTest) + + ms := lg.Messages() + len1 := len(ms) + // 10 + 20 + 40 + 80(log) + 100(log) * 9 >= 1000, so log ten times. + re.Len(ms, 10) + // 10 + 20 + 40 + 80 + 100 * 9, 13 times retry. + rfc := `["call PD API failed and retrying"] [api=testFn] [retry-time=13] [error=test]` + re.Contains(ms[len(ms)-1], rfc) + // 10 + 20 + 40 + 80(log), 4 times retry. + rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]` + re.Contains(ms[0], rfc) + + bo.resetBackoff() + err = bo.Exec(ctx, testFn) + re.ErrorIs(err, errTest) + + ms = lg.Messages() + re.Len(ms, 20) + rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=13] [error=test]` + re.Contains(ms[len(ms)-1], rfc) + rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]` + re.Contains(ms[len1], rfc) +} + +var errTest = errors.New("test") + +func testFn() error { + return errTest +} + +// testingWriter is a WriteSyncer that writes the the messages. +type testingWriter struct { + messages []string +} + +func newTestingWriter() *testingWriter { + return &testingWriter{} +} + +func (w *testingWriter) Write(p []byte) (n int, err error) { + n = len(p) + p = bytes.TrimRight(p, "\n") + m := string(p) + w.messages = append(w.messages, m) + return n, nil +} +func (w *testingWriter) Sync() error { + return nil +} + +type verifyLogger struct { + *zap.Logger + w *testingWriter +} + +func (logger *verifyLogger) Message() string { + if logger.w.messages == nil { + return "" + } + return logger.w.messages[len(logger.w.messages)-1] +} + +func (logger *verifyLogger) Messages() []string { + if logger.w.messages == nil { + return nil + } + return logger.w.messages +} + +func newZapTestLogger(cfg *log.Config, opts ...zap.Option) verifyLogger { + // TestingWriter is used to write to memory. + // Used in the verify logger. + writer := newTestingWriter() + lg, _, _ := log.InitLoggerWithWriteSyncer(cfg, writer, writer, opts...) + return verifyLogger{ + Logger: lg, + w: writer, + } +} From ad3387c37018683d8082539b7337fe0170e75cb9 Mon Sep 17 00:00:00 2001 From: Hu# Date: Tue, 9 Apr 2024 16:45:21 +0800 Subject: [PATCH 2/5] scripts: fix client and tools cover profile (#8020) ref tikv/pd#4399 Signed-off-by: husharp --- client/Makefile | 20 +++++++++++++++----- scripts/ci-subtask.sh | 14 +++++++------- tests/integrations/Makefile | 5 +++-- tests/integrations/realcluster/Makefile | 2 +- tools/Makefile | 8 ++++++-- 5 files changed, 32 insertions(+), 17 deletions(-) diff --git a/client/Makefile b/client/Makefile index dae53222d92..89c4936d179 100644 --- a/client/Makefile +++ b/client/Makefile @@ -12,20 +12,30 @@ # See the License for the specific language governing permissions and # limitations under the License. +ROOT_PATH := $(shell pwd)/.. GO_TOOLS_BIN_PATH := $(shell pwd)/../.tools/bin PATH := $(GO_TOOLS_BIN_PATH):$(PATH) SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash) default: static tidy test -test: - CGO_ENABLE=1 go test ./... -race -cover +test: failpoint-enable + CGO_ENABLE=1 go test ./... -v -tags deadlock -race -cover || { $(MAKE) failpoint-disable && exit 1; } + $(MAKE) failpoint-disable -basic-test: - CGO_ENABLE=1 go test ./... +basic-test: failpoint-enable + CGO_ENABLE=1 go test ./... || { $(MAKE) failpoint-disable && exit 1; } + $(MAKE) failpoint-disable ci-test-job: - CGO_ENABLED=1 go test ./... -race -covermode=atomic -coverprofile=covprofile -coverpkg=../... github.com/tikv/pd/client + if [ -f covprofile ]; then rm covprofile; fi + CGO_ENABLED=1 go test ./... -v -tags deadlock -race -cover -covermode=atomic -coverprofile=covprofile -coverpkg=../... + +failpoint-enable: + cd $(ROOT_PATH) && $(MAKE) failpoint-enable + +failpoint-disable: + cd $(ROOT_PATH) && $(MAKE) failpoint-disable install-tools: cd .. && $(MAKE) install-tools diff --git a/scripts/ci-subtask.sh b/scripts/ci-subtask.sh index b9006dda503..effd250965f 100755 --- a/scripts/ci-subtask.sh +++ b/scripts/ci-subtask.sh @@ -2,12 +2,12 @@ # ./ci-subtask.sh -ROOT_PATH=../../ +ROOT_PATH_COV=$(pwd)/covprofile if [[ $2 -gt 9 ]]; then # run tools tests if [[ $2 -eq 10 ]]; then - cd ./tools && make ci-test-job && cd .. && cat ./covprofile >> covprofile || exit 1 + cd ./tools && make ci-test-job && cat covprofile >> $ROOT_PATH_COV || exit 1 exit fi @@ -16,12 +16,12 @@ if [[ $2 -gt 9 ]]; then integrations_tasks=($(find "$integrations_dir" -mindepth 1 -maxdepth 1 -type d)) for t in "${integrations_tasks[@]}"; do if [[ "$t" = "$integrations_dir/client" && $2 -eq 11 ]]; then - cd ./client && make ci-test-job && cd .. && cat ./covprofile >> covprofile || exit 1 - cd $integrations_dir && make ci-test-job test_name=client && cat ./client/covprofile >> "$ROOT_PATH/covprofile" || exit 1 + cd ./client && make ci-test-job && cat covprofile >> $ROOT_PATH_COV && cd .. || exit 1 + cd $integrations_dir && make ci-test-job test_name=client && cat ./client/covprofile >> $ROOT_PATH_COV || exit 1 elif [[ "$t" = "$integrations_dir/tso" && $2 -eq 12 ]]; then - cd $integrations_dir && make ci-test-job test_name=tso && cat ./tso/covprofile >> "$ROOT_PATH/covprofile" || exit 1 + cd $integrations_dir && make ci-test-job test_name=tso && cat ./tso/covprofile >> $ROOT_PATH_COV || exit 1 elif [[ "$t" = "$integrations_dir/mcs" && $2 -eq 13 ]]; then - cd $integrations_dir && make ci-test-job test_name=mcs && cat ./mcs/covprofile >> "$ROOT_PATH/covprofile" || exit 1 + cd $integrations_dir && make ci-test-job test_name=mcs && cat ./mcs/covprofile >> $ROOT_PATH_COV || exit 1 fi done else @@ -61,5 +61,5 @@ else [[ $(($min_i + 1)) -eq $2 ]] && res+=($t) done - CGO_ENABLED=1 go test -timeout=15m -tags deadlock -race -covermode=atomic -coverprofile=covprofile -coverpkg=./... ${res[@]} + CGO_ENABLED=1 go test -timeout=15m -tags deadlock -race -covermode=atomic -coverprofile=$ROOT_PATH_COV -coverpkg=./... ${res[@]} fi diff --git a/tests/integrations/Makefile b/tests/integrations/Makefile index 43b5945baca..b43ecabce7e 100644 --- a/tests/integrations/Makefile +++ b/tests/integrations/Makefile @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -ROOT_PATH := ../.. +ROOT_PATH := $(shell pwd)/../.. GO_TOOLS_BIN_PATH := $(ROOT_PATH)/.tools/bin PATH := $(GO_TOOLS_BIN_PATH):$(PATH) SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash) @@ -35,7 +35,8 @@ test: failpoint-enable $(MAKE) failpoint-disable ci-test-job: - CGO_ENABLED=1 go test ./$(value test_name)/... -v -tags deadlock -race -covermode=atomic -coverprofile=./$(value test_name)/covprofile -coverpkg=$(ROOT_PATH)/... + if [ -f covprofile ]; then rm ./$(value test_name)/covprofile; fi + CGO_ENABLED=1 go test ./$(value test_name)/... -v -tags deadlock -race -cover -covermode=atomic -coverprofile=./$(value test_name)/covprofile -coverpkg=$(ROOT_PATH)/... install-tools: cd $(ROOT_PATH) && $(MAKE) install-tools diff --git a/tests/integrations/realcluster/Makefile b/tests/integrations/realcluster/Makefile index 4817b94b5da..278f585feaa 100644 --- a/tests/integrations/realcluster/Makefile +++ b/tests/integrations/realcluster/Makefile @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -ROOT_PATH := ../../.. +ROOT_PATH := $(shell pwd)/../../.. GO_TOOLS_BIN_PATH := $(ROOT_PATH)/.tools/bin PATH := $(GO_TOOLS_BIN_PATH):$(PATH) SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash) diff --git a/tools/Makefile b/tools/Makefile index 052f8573b62..336cc536949 100644 --- a/tools/Makefile +++ b/tools/Makefile @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -ROOT_PATH := .. +ROOT_PATH := $(shell pwd)/.. GO_TOOLS_BIN_PATH := $(ROOT_PATH)/.tools/bin PATH := $(GO_TOOLS_BIN_PATH):$(PATH) SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash) @@ -33,10 +33,14 @@ tidy: git diff go.mod go.sum | cat git diff --quiet go.mod go.sum -ci-test-job: failpoint-enable +test: failpoint-enable CGO_ENABLED=1 go test ./... -v -tags deadlock -race -cover || { $(MAKE) failpoint-disable && exit 1; } $(MAKE) failpoint-disable +ci-test-job: + if [ -f covprofile ]; then rm covprofile; fi + CGO_ENABLED=1 go test ./... -v -tags deadlock -race -cover -covermode=atomic -coverprofile=covprofile -coverpkg=../... + failpoint-enable: cd $(ROOT_PATH) && $(MAKE) failpoint-enable From 35e8e9554a9efa95230409d22344ab304121502a Mon Sep 17 00:00:00 2001 From: ShuNing Date: Tue, 9 Apr 2024 18:19:51 +0800 Subject: [PATCH 3/5] pkg/ratelimit: introduce an executor that can run with a limiter (#8024) ref tikv/pd#7897 pkg/ratelimit: Introduce an executor that can run with a limiter - async runner can run the task with the limiter, the task will parallel run. - sync runner used to keep the behavior like before Signed-off-by: nolouch Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/ratelimit/concurrency_limiter.go | 99 +++++++++++-- pkg/ratelimit/concurrency_limiter_test.go | 82 ++++++++++- pkg/ratelimit/limiter.go | 10 +- pkg/ratelimit/runner.go | 168 ++++++++++++++++++++++ pkg/ratelimit/runner_test.go | 75 ++++++++++ 5 files changed, 410 insertions(+), 24 deletions(-) create mode 100644 pkg/ratelimit/runner.go create mode 100644 pkg/ratelimit/runner_test.go diff --git a/pkg/ratelimit/concurrency_limiter.go b/pkg/ratelimit/concurrency_limiter.go index b1eef3c8101..af768461478 100644 --- a/pkg/ratelimit/concurrency_limiter.go +++ b/pkg/ratelimit/concurrency_limiter.go @@ -14,24 +14,33 @@ package ratelimit -import "github.com/tikv/pd/pkg/utils/syncutil" +import ( + "context" -type concurrencyLimiter struct { - mu syncutil.RWMutex + "github.com/tikv/pd/pkg/utils/syncutil" +) + +// ConcurrencyLimiter is a limiter that limits the number of concurrent tasks. +type ConcurrencyLimiter struct { + mu syncutil.Mutex current uint64 + waiting uint64 limit uint64 // statistic maxLimit uint64 + queue chan *TaskToken } -func newConcurrencyLimiter(limit uint64) *concurrencyLimiter { - return &concurrencyLimiter{limit: limit} +// NewConcurrencyLimiter creates a new ConcurrencyLimiter. +func NewConcurrencyLimiter(limit uint64) *ConcurrencyLimiter { + return &ConcurrencyLimiter{limit: limit, queue: make(chan *TaskToken, limit)} } const unlimit = uint64(0) -func (l *concurrencyLimiter) allow() bool { +// old interface. only used in the ratelimiter package. +func (l *ConcurrencyLimiter) allow() bool { l.mu.Lock() defer l.mu.Unlock() @@ -45,7 +54,8 @@ func (l *concurrencyLimiter) allow() bool { return false } -func (l *concurrencyLimiter) release() { +// old interface. only used in the ratelimiter package. +func (l *ConcurrencyLimiter) release() { l.mu.Lock() defer l.mu.Unlock() @@ -54,28 +64,32 @@ func (l *concurrencyLimiter) release() { } } -func (l *concurrencyLimiter) getLimit() uint64 { - l.mu.RLock() - defer l.mu.RUnlock() +// old interface. only used in the ratelimiter package. +func (l *ConcurrencyLimiter) getLimit() uint64 { + l.mu.Lock() + defer l.mu.Unlock() return l.limit } -func (l *concurrencyLimiter) setLimit(limit uint64) { +// old interface. only used in the ratelimiter package. +func (l *ConcurrencyLimiter) setLimit(limit uint64) { l.mu.Lock() defer l.mu.Unlock() l.limit = limit } -func (l *concurrencyLimiter) getCurrent() uint64 { - l.mu.RLock() - defer l.mu.RUnlock() +// GetRunningTasksNum returns the number of running tasks. +func (l *ConcurrencyLimiter) GetRunningTasksNum() uint64 { + l.mu.Lock() + defer l.mu.Unlock() return l.current } -func (l *concurrencyLimiter) getMaxConcurrency() uint64 { +// old interface. only used in the ratelimiter package. +func (l *ConcurrencyLimiter) getMaxConcurrency() uint64 { l.mu.Lock() defer func() { l.maxLimit = l.current @@ -84,3 +98,58 @@ func (l *concurrencyLimiter) getMaxConcurrency() uint64 { return l.maxLimit } + +// GetWaitingTasksNum returns the number of waiting tasks. +func (l *ConcurrencyLimiter) GetWaitingTasksNum() uint64 { + l.mu.Lock() + defer l.mu.Unlock() + return l.waiting +} + +// Acquire acquires a token from the limiter. which will block until a token is available or ctx is done, like Timeout. +func (l *ConcurrencyLimiter) Acquire(ctx context.Context) (*TaskToken, error) { + l.mu.Lock() + if l.current >= l.limit { + l.waiting++ + l.mu.Unlock() + // block the waiting task on the caller goroutine + select { + case <-ctx.Done(): + l.mu.Lock() + l.waiting-- + l.mu.Unlock() + return nil, ctx.Err() + case token := <-l.queue: + l.mu.Lock() + token.released = false + l.current++ + l.waiting-- + l.mu.Unlock() + return token, nil + } + } + l.current++ + token := &TaskToken{limiter: l} + l.mu.Unlock() + return token, nil +} + +// TaskToken is a token that must be released after the task is done. +type TaskToken struct { + released bool + limiter *ConcurrencyLimiter +} + +// Release releases the token. +func (tt *TaskToken) Release() { + tt.limiter.mu.Lock() + defer tt.limiter.mu.Unlock() + if tt.released { + return + } + tt.released = true + tt.limiter.current-- + if len(tt.limiter.queue) < int(tt.limiter.limit) { + tt.limiter.queue <- tt + } +} diff --git a/pkg/ratelimit/concurrency_limiter_test.go b/pkg/ratelimit/concurrency_limiter_test.go index 5fe03740394..e77c79c8ebc 100644 --- a/pkg/ratelimit/concurrency_limiter_test.go +++ b/pkg/ratelimit/concurrency_limiter_test.go @@ -15,7 +15,12 @@ package ratelimit import ( + "context" + "fmt" + "sync" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -23,7 +28,7 @@ import ( func TestConcurrencyLimiter(t *testing.T) { t.Parallel() re := require.New(t) - cl := newConcurrencyLimiter(10) + cl := NewConcurrencyLimiter(10) for i := 0; i < 10; i++ { re.True(cl.allow()) } @@ -35,9 +40,9 @@ func TestConcurrencyLimiter(t *testing.T) { re.Equal(uint64(10), cl.getMaxConcurrency()) cl.setLimit(5) re.Equal(uint64(5), cl.getLimit()) - re.Equal(uint64(10), cl.getCurrent()) + re.Equal(uint64(10), cl.GetRunningTasksNum()) cl.release() - re.Equal(uint64(9), cl.getCurrent()) + re.Equal(uint64(9), cl.GetRunningTasksNum()) for i := 0; i < 9; i++ { cl.release() } @@ -45,10 +50,79 @@ func TestConcurrencyLimiter(t *testing.T) { for i := 0; i < 5; i++ { re.True(cl.allow()) } - re.Equal(uint64(5), cl.getCurrent()) + re.Equal(uint64(5), cl.GetRunningTasksNum()) for i := 0; i < 5; i++ { cl.release() } re.Equal(uint64(5), cl.getMaxConcurrency()) re.Equal(uint64(0), cl.getMaxConcurrency()) } + +func TestConcurrencyLimiter2(t *testing.T) { + limit := uint64(2) + limiter := NewConcurrencyLimiter(limit) + + require.Equal(t, uint64(0), limiter.GetRunningTasksNum(), "Expected running tasks to be 0") + require.Equal(t, uint64(0), limiter.GetWaitingTasksNum(), "Expected waiting tasks to be 0") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Acquire two tokens + token1, err := limiter.Acquire(ctx) + require.NoError(t, err, "Failed to acquire token") + + token2, err := limiter.Acquire(ctx) + require.NoError(t, err, "Failed to acquire token") + + require.Equal(t, limit, limiter.GetRunningTasksNum(), "Expected running tasks to be 2") + + // Try to acquire third token, it should not be able to acquire immediately due to limit + go func() { + _, err := limiter.Acquire(ctx) + require.NoError(t, err, "Failed to acquire token") + }() + + time.Sleep(100 * time.Millisecond) // Give some time for the goroutine to run + require.Equal(t, uint64(1), limiter.GetWaitingTasksNum(), "Expected waiting tasks to be 1") + + // Release a token + token1.Release() + time.Sleep(100 * time.Millisecond) // Give some time for the goroutine to run + require.Equal(t, uint64(2), limiter.GetRunningTasksNum(), "Expected running tasks to be 2") + require.Equal(t, uint64(0), limiter.GetWaitingTasksNum(), "Expected waiting tasks to be 0") + + // Release the second token + token2.Release() + time.Sleep(100 * time.Millisecond) // Give some time for the goroutine to run + require.Equal(t, uint64(1), limiter.GetRunningTasksNum(), "Expected running tasks to be 1") +} + +func TestConcurrencyLimiterAcquire(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + limiter := NewConcurrencyLimiter(20) + sum := int64(0) + start := time.Now() + wg := &sync.WaitGroup{} + wg.Add(100) + for i := 0; i < 100; i++ { + go func(i int) { + defer wg.Done() + token, err := limiter.Acquire(ctx) + if err != nil { + fmt.Printf("Task %d failed to acquire: %v\n", i, err) + return + } + defer token.Release() + // simulate takes some time + time.Sleep(10 * time.Millisecond) + atomic.AddInt64(&sum, 1) + }(i) + } + wg.Wait() + // We should have 20 tasks running concurrently, so it should take at least 50ms to complete + require.Greater(t, time.Since(start).Milliseconds(), int64(50)) + require.Equal(t, int64(100), sum) +} diff --git a/pkg/ratelimit/limiter.go b/pkg/ratelimit/limiter.go index dc744d9ac1b..eaf6acf7c17 100644 --- a/pkg/ratelimit/limiter.go +++ b/pkg/ratelimit/limiter.go @@ -36,18 +36,18 @@ type DimensionConfig struct { type limiter struct { mu syncutil.RWMutex - concurrency *concurrencyLimiter + concurrency *ConcurrencyLimiter rate *RateLimiter } func newLimiter() *limiter { lim := &limiter{ - concurrency: newConcurrencyLimiter(0), + concurrency: NewConcurrencyLimiter(0), } return lim } -func (l *limiter) getConcurrencyLimiter() *concurrencyLimiter { +func (l *limiter) getConcurrencyLimiter() *ConcurrencyLimiter { l.mu.RLock() defer l.mu.RUnlock() return l.concurrency @@ -81,7 +81,7 @@ func (l *limiter) getQPSLimiterStatus() (limit rate.Limit, burst int) { func (l *limiter) getConcurrencyLimiterStatus() (limit uint64, current uint64) { baseLimiter := l.getConcurrencyLimiter() if baseLimiter != nil { - return baseLimiter.getLimit(), baseLimiter.getCurrent() + return baseLimiter.getLimit(), baseLimiter.GetRunningTasksNum() } return 0, 0 } @@ -101,7 +101,7 @@ func (l *limiter) updateConcurrencyConfig(limit uint64) UpdateStatus { } l.concurrency.setLimit(limit) } else { - l.concurrency = newConcurrencyLimiter(limit) + l.concurrency = NewConcurrencyLimiter(limit) } return ConcurrencyChanged } diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go new file mode 100644 index 00000000000..661668af3b9 --- /dev/null +++ b/pkg/ratelimit/runner.go @@ -0,0 +1,168 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ratelimit + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/pingcap/log" + "go.uber.org/zap" +) + +const initialCapacity = 100 + +// Runner is the interface for running tasks. +type Runner interface { + RunTask(ctx context.Context, opt TaskOpts, f func(context.Context)) error +} + +// Task is a task to be run. +type Task struct { + Ctx context.Context + Opts TaskOpts + f func(context.Context) + submittedAt time.Time +} + +// ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum. +var ErrMaxWaitingTasksExceeded = errors.New("max waiting tasks exceeded") + +// AsyncRunner is a simple task runner that limits the number of concurrent tasks. +type AsyncRunner struct { + name string + maxPendingDuration time.Duration + taskChan chan *Task + pendingTasks []*Task + pendingMu sync.Mutex + stopChan chan struct{} + wg sync.WaitGroup +} + +// NewAsyncRunner creates a new AsyncRunner. +func NewAsyncRunner(name string, maxPendingDuration time.Duration) *AsyncRunner { + s := &AsyncRunner{ + name: name, + maxPendingDuration: maxPendingDuration, + taskChan: make(chan *Task), + pendingTasks: make([]*Task, 0, initialCapacity), + stopChan: make(chan struct{}), + } + s.Start() + return s +} + +// TaskOpts is the options for RunTask. +type TaskOpts struct { + // TaskName is a human-readable name for the operation. TODO: metrics by name. + TaskName string + Limit *ConcurrencyLimiter +} + +// Start starts the runner. +func (s *AsyncRunner) Start() { + s.wg.Add(1) + go func() { + defer s.wg.Done() + for { + select { + case task := <-s.taskChan: + if task.Opts.Limit != nil { + token, err := task.Opts.Limit.Acquire(context.Background()) + if err != nil { + log.Error("failed to acquire semaphore", zap.String("task-name", task.Opts.TaskName), zap.Error(err)) + continue + } + go s.run(task.Ctx, task.f, token) + } else { + go s.run(task.Ctx, task.f, nil) + } + case <-s.stopChan: + log.Info("stopping async task runner", zap.String("name", s.name)) + return + } + } + }() +} + +func (s *AsyncRunner) run(ctx context.Context, task func(context.Context), token *TaskToken) { + task(ctx) + if token != nil { + token.Release() + s.processPendingTasks() + } +} + +func (s *AsyncRunner) processPendingTasks() { + s.pendingMu.Lock() + defer s.pendingMu.Unlock() + for len(s.pendingTasks) > 0 { + task := s.pendingTasks[0] + select { + case s.taskChan <- task: + s.pendingTasks = s.pendingTasks[1:] + return + default: + return + } + } +} + +// Stop stops the runner. +func (s *AsyncRunner) Stop() { + close(s.stopChan) + s.wg.Wait() +} + +// RunTask runs the task asynchronously. +func (s *AsyncRunner) RunTask(ctx context.Context, opt TaskOpts, f func(context.Context)) error { + task := &Task{ + Ctx: ctx, + Opts: opt, + f: f, + } + s.processPendingTasks() + select { + case s.taskChan <- task: + default: + s.pendingMu.Lock() + defer s.pendingMu.Unlock() + if len(s.pendingTasks) > 0 { + maxWait := time.Since(s.pendingTasks[0].submittedAt) + if maxWait > s.maxPendingDuration { + return ErrMaxWaitingTasksExceeded + } + } + task.submittedAt = time.Now() + s.pendingTasks = append(s.pendingTasks, task) + } + return nil +} + +// SyncRunner is a simple task runner that limits the number of concurrent tasks. +type SyncRunner struct{} + +// NewSyncRunner creates a new SyncRunner. +func NewSyncRunner() *SyncRunner { + return &SyncRunner{} +} + +// RunTask runs the task synchronously. +func (s *SyncRunner) RunTask(ctx context.Context, opt TaskOpts, f func(context.Context)) error { + f(ctx) + return nil +} diff --git a/pkg/ratelimit/runner_test.go b/pkg/ratelimit/runner_test.go new file mode 100644 index 00000000000..8a9eff77379 --- /dev/null +++ b/pkg/ratelimit/runner_test.go @@ -0,0 +1,75 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ratelimit + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestAsyncRunner(t *testing.T) { + t.Run("RunTask", func(t *testing.T) { + limiter := NewConcurrencyLimiter(1) + runner := NewAsyncRunner("test", time.Second) + defer runner.Stop() + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + time.Sleep(50 * time.Millisecond) + wg.Add(1) + err := runner.RunTask(context.Background(), TaskOpts{ + TaskName: "test1", + Limit: limiter, + }, func(ctx context.Context) { + defer wg.Done() + time.Sleep(100 * time.Millisecond) + }) + require.NoError(t, err) + } + wg.Wait() + }) + + t.Run("MaxPendingDuration", func(t *testing.T) { + limiter := NewConcurrencyLimiter(1) + runner := NewAsyncRunner("test", 2*time.Millisecond) + defer runner.Stop() + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + err := runner.RunTask(context.Background(), TaskOpts{ + TaskName: "test2", + Limit: limiter, + }, func(ctx context.Context) { + defer wg.Done() + time.Sleep(100 * time.Millisecond) + }) + if err != nil { + wg.Done() + // task 0 running + // task 1 after recv by runner, blocked by task 1, wait on Acquire. + // task 2 enqueue pendingTasks + // task 3 enqueue pendingTasks + // task 4 enqueue pendingTasks, check pendingTasks[0] timeout, report error + require.GreaterOrEqual(t, i, 4) + } + time.Sleep(1 * time.Millisecond) + } + wg.Wait() + }) +} From 3475fe5ed71401f0e05fbdb8484393ff364ae9c0 Mon Sep 17 00:00:00 2001 From: Hu# Date: Tue, 9 Apr 2024 19:43:51 +0800 Subject: [PATCH 4/5] tests/tso: make TestPrimaryPriorityChange stable (#8045) ref tikv/pd#7969 Signed-off-by: husharp --- pkg/tso/keyspace_group_manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 9fe6da6edc9..ad67c49fa5e 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -1212,5 +1212,5 @@ func waitForPrimariesServing( } } return true - }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + }, testutil.WithWaitFor(10*time.Second), testutil.WithTickInterval(50*time.Millisecond)) } From 2b8116e7113697df12e00572c61b4a7a2ef283eb Mon Sep 17 00:00:00 2001 From: Hu# Date: Tue, 9 Apr 2024 20:31:51 +0800 Subject: [PATCH 5/5] *: fix ci covprofile by updating codecov version and replacing cover pkg (#8044) ref tikv/pd#4399 Signed-off-by: husharp Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- .github/workflows/pd-tests.yaml | 2 +- client/Makefile | 6 +++--- scripts/ci-subtask.sh | 6 +++--- tests/integrations/Makefile | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/.github/workflows/pd-tests.yaml b/.github/workflows/pd-tests.yaml index 3674e41cf8a..1508c1a1457 100644 --- a/.github/workflows/pd-tests.yaml +++ b/.github/workflows/pd-tests.yaml @@ -72,7 +72,7 @@ jobs: TOTAL_JOBS: ${{needs.chunks.outputs.job-total}} run: for i in $(seq 1 $TOTAL_JOBS); do cat covprofile_$i >> covprofile; done - name: Send coverage - uses: codecov/codecov-action@v1 + uses: codecov/codecov-action@v4.2.0 with: token: ${{ secrets.CODECOV }} file: ./covprofile diff --git a/client/Makefile b/client/Makefile index 89c4936d179..3328bfe8d11 100644 --- a/client/Makefile +++ b/client/Makefile @@ -13,18 +13,18 @@ # limitations under the License. ROOT_PATH := $(shell pwd)/.. -GO_TOOLS_BIN_PATH := $(shell pwd)/../.tools/bin +GO_TOOLS_BIN_PATH := $(ROOT_PATH)/.tools/bin PATH := $(GO_TOOLS_BIN_PATH):$(PATH) SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash) default: static tidy test test: failpoint-enable - CGO_ENABLE=1 go test ./... -v -tags deadlock -race -cover || { $(MAKE) failpoint-disable && exit 1; } + CGO_ENABLED=1 go test ./... -v -tags deadlock -race -cover || { $(MAKE) failpoint-disable && exit 1; } $(MAKE) failpoint-disable basic-test: failpoint-enable - CGO_ENABLE=1 go test ./... || { $(MAKE) failpoint-disable && exit 1; } + CGO_ENABLED=1 go test ./... || { $(MAKE) failpoint-disable && exit 1; } $(MAKE) failpoint-disable ci-test-job: diff --git a/scripts/ci-subtask.sh b/scripts/ci-subtask.sh index effd250965f..c00cba9c0a4 100755 --- a/scripts/ci-subtask.sh +++ b/scripts/ci-subtask.sh @@ -12,11 +12,11 @@ if [[ $2 -gt 9 ]]; then fi # Currently, we only have 3 integration tests, so we can hardcode the task index. - integrations_dir=./tests/integrations + integrations_dir=$(pwd)/tests/integrations integrations_tasks=($(find "$integrations_dir" -mindepth 1 -maxdepth 1 -type d)) for t in "${integrations_tasks[@]}"; do if [[ "$t" = "$integrations_dir/client" && $2 -eq 11 ]]; then - cd ./client && make ci-test-job && cat covprofile >> $ROOT_PATH_COV && cd .. || exit 1 + cd ./client && make ci-test-job && cat covprofile >> $ROOT_PATH_COV || exit 1 cd $integrations_dir && make ci-test-job test_name=client && cat ./client/covprofile >> $ROOT_PATH_COV || exit 1 elif [[ "$t" = "$integrations_dir/tso" && $2 -eq 12 ]]; then cd $integrations_dir && make ci-test-job test_name=tso && cat ./tso/covprofile >> $ROOT_PATH_COV || exit 1 @@ -61,5 +61,5 @@ else [[ $(($min_i + 1)) -eq $2 ]] && res+=($t) done - CGO_ENABLED=1 go test -timeout=15m -tags deadlock -race -covermode=atomic -coverprofile=$ROOT_PATH_COV -coverpkg=./... ${res[@]} + CGO_ENABLED=1 go test -timeout=15m -tags deadlock -race -cover -covermode=atomic -coverprofile=$ROOT_PATH_COV -coverpkg=./... ${res[@]} fi diff --git a/tests/integrations/Makefile b/tests/integrations/Makefile index b43ecabce7e..73cf4da8ab9 100644 --- a/tests/integrations/Makefile +++ b/tests/integrations/Makefile @@ -35,8 +35,8 @@ test: failpoint-enable $(MAKE) failpoint-disable ci-test-job: - if [ -f covprofile ]; then rm ./$(value test_name)/covprofile; fi - CGO_ENABLED=1 go test ./$(value test_name)/... -v -tags deadlock -race -cover -covermode=atomic -coverprofile=./$(value test_name)/covprofile -coverpkg=$(ROOT_PATH)/... + if [ -f ./$(value test_name)/covprofile ]; then rm ./$(value test_name)/covprofile; fi + CGO_ENABLED=1 go test ./$(value test_name)/... -v -tags deadlock -race -cover -covermode=atomic -coverprofile=./$(value test_name)/covprofile -coverpkg=../../... install-tools: cd $(ROOT_PATH) && $(MAKE) install-tools