Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

cherry-pick #247 to relase 4.0 #283

Merged
merged 1 commit into from
May 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ PACKAGES := go list ./...
PACKAGE_DIRECTORIES := $(PACKAGES) | sed 's/github.com\/pingcap\/br\/*//'
GOCHECKER := awk '{ print } END { if (NR > 0) { exit 1 } }'


BR_PKG := github.com/pingcap/br

LDFLAGS += -X "$(BR_PKG)/pkg/utils.BRReleaseVersion=$(shell git describe --tags --dirty)"
Expand All @@ -21,27 +22,26 @@ all: check test build
build:
GO111MODULE=on go build -ldflags '$(LDFLAGS)' ${RACEFLAG} -o bin/br

build_for_integration_test:
GO111MODULE=on go test -c -cover -covermode=count \
build_for_integration_test: failpoint-enable
(GO111MODULE=on go test -c -cover -covermode=count \
-coverpkg=$(BR_PKG)/... \
-o bin/br.test
# build key locker
GO111MODULE=on go build ${RACEFLAG} -o bin/locker tests/br_key_locked/*.go
# build gc
GO111MODULE=on go build ${RACEFLAG} -o bin/gc tests/br_z_gc_safepoint/*.go
# build rawkv client
GO111MODULE=on go build ${RACEFLAG} -o bin/rawkv tests/br_rawkv/*.go

test:
GO111MODULE=on go test ${RACEFLAG} -tags leak ./...

testcover:
-o bin/br.test && \
GO111MODULE=on go build ${RACEFLAG} -o bin/locker tests/br_key_locked/*.go && \
GO111MODULE=on go build ${RACEFLAG} -o bin/gc tests/br_z_gc_safepoint/*.go && \
GO111MODULE=on go build ${RACEFLAG} -o bin/rawkv tests/br_rawkv/*.go) || (make failpoint-disable && exit 1)
@make failpoint-disable

test: failpoint-enable
GO111MODULE=on go test ${RACEFLAG} -tags leak ./... || ( make failpoint-disable && exit 1 )
@make failpoint-disable

testcover: tools failpoint-enable
GO111MODULE=on tools/bin/overalls \
-project=$(BR_PKG) \
-covermode=count \
-ignore='.git,vendor,tests,_tools' \
-debug \
-- -coverpkg=./...
-- -coverpkg=./... || ( make failpoint-disable && exit 1 )

integration_test: build build_for_integration_test
@which bin/tidb-server
Expand Down Expand Up @@ -103,4 +103,10 @@ tidy:
GO111MODULE=on go mod tidy
git diff --quiet go.mod go.sum

failpoint-enable: tools
tools/bin/failpoint-ctl enable

failpoint-disable: tools
tools/bin/failpoint-ctl disable

.PHONY: tools
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/google/uuid v1.1.1
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/failpoint v0.0.0-20200506114213-c17f16071c53
github.com/pingcap/kvproto v0.0.0-20200509065137-6a4d5c264a8b
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd
github.com/pingcap/parser v0.0.0-20200512060642-fa8905763928
Expand All @@ -26,6 +27,7 @@ require (
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/multierr v1.5.0
go.uber.org/zap v1.15.0
golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6
google.golang.org/api v0.15.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d h1:F8vp38kTAckN+
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/failpoint v0.0.0-20200210140405-f8f9fb234798 h1:6DMbRqPI1qzQ8N1xc3+nKY8IxSACd9VqQKkRVvbyoIg=
github.com/pingcap/failpoint v0.0.0-20200210140405-f8f9fb234798/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/failpoint v0.0.0-20200506114213-c17f16071c53 h1:8sC8OLinmaw24xLeeJlYBFvUBsOiOYBtNqTuVOTnynQ=
github.com/pingcap/failpoint v0.0.0-20200506114213-c17f16071c53/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d h1:rCmRK0lCRrHMUbS99BKFYhK9YxJDNw0xB033cQbYo0s=
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d/go.mod h1:fMRU1BA1y+r89AxUoaAar4JjrhUkVDt0o0Np6V8XbDQ=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
Expand Down
154 changes: 123 additions & 31 deletions pkg/restore/split_client.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,25 @@ import (
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pingcap/log"
pd "github.com/pingcap/pd/v4/client"
"github.com/pingcap/pd/v4/server/schedule/placement"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

const (
splitRegionMaxRetryTime = 4
)

// SplitClient is an external client used by RegionSplitter.
type SplitClient interface {
// GetStore gets a store by a store id.
Expand Down Expand Up @@ -186,48 +195,131 @@ func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key
}, nil
}

func (c *pdClient) BatchSplitRegions(
ctx context.Context, regionInfo *RegionInfo, keys [][]byte,
) ([]*RegionInfo, error) {
var peer *metapb.Peer
if regionInfo.Leader != nil {
peer = regionInfo.Leader
} else {
if len(regionInfo.Region.Peers) == 0 {
return nil, errors.New("region does not have peer")
func splitRegionWithFailpoint(
ctx context.Context,
regionInfo *RegionInfo,
peer *metapb.Peer,
client tikvpb.TikvClient,
keys [][]byte,
) (*kvrpcpb.SplitRegionResponse, error) {
failpoint.Inject("not-leader-error", func(injectNewLeader failpoint.Value) {
log.Debug("failpoint not-leader-error injected.")
resp := &kvrpcpb.SplitRegionResponse{
RegionError: &errorpb.Error{
NotLeader: &errorpb.NotLeader{
RegionId: regionInfo.Region.Id,
},
},
}
peer = regionInfo.Region.Peers[0]
}

storeID := peer.GetStoreId()
store, err := c.GetStore(ctx, storeID)
if err != nil {
return nil, err
}
opt := grpc.WithInsecure()
if c.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt)
if err != nil {
return nil, err
}
defer conn.Close()
client := tikvpb.NewTikvClient(conn)
resp, err := client.SplitRegion(ctx, &kvrpcpb.SplitRegionRequest{
if injectNewLeader.(bool) {
resp.RegionError.NotLeader.Leader = regionInfo.Leader
}
failpoint.Return(resp, nil)
})
failpoint.Inject("somewhat-retryable-error", func() {
log.Debug("failpoint somewhat-retryable-error injected.")
failpoint.Return(&kvrpcpb.SplitRegionResponse{
RegionError: &errorpb.Error{
ServerIsBusy: &errorpb.ServerIsBusy{},
},
}, nil)
})
return client.SplitRegion(ctx, &kvrpcpb.SplitRegionRequest{
Context: &kvrpcpb.Context{
RegionId: regionInfo.Region.Id,
RegionEpoch: regionInfo.Region.RegionEpoch,
Peer: peer,
},
SplitKeys: keys,
})
}

func (c *pdClient) sendSplitRegionRequest(
ctx context.Context, regionInfo *RegionInfo, keys [][]byte,
) (*kvrpcpb.SplitRegionResponse, error) {
var splitErrors error
for i := 0; i < splitRegionMaxRetryTime; i++ {
var peer *metapb.Peer
if regionInfo.Leader != nil {
peer = regionInfo.Leader
} else {
if len(regionInfo.Region.Peers) == 0 {
return nil, multierr.Append(splitErrors,
errors.Errorf("region[%d] doesn't have any peer", regionInfo.Region.GetId()))
}
peer = regionInfo.Region.Peers[0]
}
storeID := peer.GetStoreId()
store, err := c.GetStore(ctx, storeID)
if err != nil {
return nil, multierr.Append(splitErrors, err)
}
opt := grpc.WithInsecure()
if c.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt)
if err != nil {
return nil, multierr.Append(splitErrors, err)
}
defer conn.Close()
client := tikvpb.NewTikvClient(conn)
resp, err := splitRegionWithFailpoint(ctx, regionInfo, peer, client, keys)
if err != nil {
return nil, multierr.Append(splitErrors, err)
}
if resp.RegionError != nil {
splitErrors = multierr.Append(splitErrors,
errors.Errorf("split region failed: region=%v, err=%v",
regionInfo.Region, resp.RegionError))
if nl := resp.RegionError.NotLeader; nl != nil {
if leader := nl.GetLeader(); leader != nil {
regionInfo.Leader = leader
} else {
newRegionInfo, findLeaderErr := c.GetRegionByID(ctx, nl.RegionId)
if findLeaderErr != nil {
return nil, multierr.Append(splitErrors, findLeaderErr)
}
if !checkRegionEpoch(newRegionInfo, regionInfo) {
return nil, multierr.Append(splitErrors, ErrEpochNotMatch)
}
log.Info("find new leader", zap.Uint64("new leader", newRegionInfo.Leader.Id))
regionInfo = newRegionInfo
}
log.Info("split region meet not leader error, retrying",
zap.Int("retry times", i),
zap.Uint64("regionID", regionInfo.Region.Id),
zap.Any("new leader", regionInfo.Leader),
)
continue
}
// TODO: we don't handle RegionNotMatch and RegionNotFound here,
// because I think we don't have enough information to retry.
// But maybe we can handle them here by some information the error itself provides.
if resp.RegionError.ServerIsBusy != nil ||
resp.RegionError.StaleCommand != nil {
log.Warn("a error occurs on split region",
zap.Int("retry times", i),
zap.Uint64("regionID", regionInfo.Region.Id),
zap.String("error", resp.RegionError.Message),
zap.Any("error verbose", resp.RegionError),
)
continue
}
return nil, splitErrors
}
return resp, nil
}
return nil, splitErrors
}

func (c *pdClient) BatchSplitRegions(
ctx context.Context, regionInfo *RegionInfo, keys [][]byte,
) ([]*RegionInfo, error) {
resp, err := c.sendSplitRegionRequest(ctx, regionInfo, keys)
if err != nil {
return nil, err
}
if resp.RegionError != nil {
return nil, errors.Errorf("split region failed: region=%v, err=%v", regionInfo.Region, resp.RegionError)
}

regions := resp.GetRegions()
newRegionInfos := make([]*RegionInfo, 0, len(regions))
Expand Down
84 changes: 84 additions & 0 deletions tests/br_split_region_fail/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/bin/sh
#
# Copyright 2020 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu
DB="$TEST_NAME"
TABLE="usertable"
LOG="not-leader.log"
DB_COUNT=3

for i in $(seq $DB_COUNT); do
run_sql "CREATE DATABASE $DB${i};"
go-ycsb load mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB${i}
done

for i in $(seq $DB_COUNT); do
row_count_ori[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}')
done


# backup full
echo "backup start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4

rm -f $LOG

for i in $(seq $DB_COUNT); do
run_sql "DROP DATABASE $DB${i};"
done


# restore full
echo "restore start..."

unset BR_LOG_TO_TERM
GO_FAILPOINTS="github.com/pingcap/br/pkg/restore/not-leader-error=1*return(true)->1*return(false);\
github.com/pingcap/br/pkg/restore/somewhat-retryable-error=3*return(true)" \
run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --ratelimit 1024 --log-file $LOG || true
BR_LOG_TO_TERM=1

grep "a error occurs on split region" $LOG && \
grep "split region meet not leader error" $LOG && \
grep "Full restore Success" $LOG && \
grep "find new leader" $LOG

if [ $? -ne 0 ]; then
echo "failed to retry on failpoint."
echo "full log:"
cat $LOG
exit 1
fi

for i in $(seq $DB_COUNT); do
row_count_new[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}')
done

fail=false
for i in $(seq $DB_COUNT); do
if [ "${row_count_ori[i]}" != "${row_count_new[i]}" ];then
fail=true
echo "TEST: [$TEST_NAME] fail on database $DB${i}"
fi
echo "database $DB${i} [original] row count: ${row_count_ori[i]}, [after br] row count: ${row_count_new[i]}"
done

if $fail; then
echo "TEST: [$TEST_NAME] failed!"
exit 1
fi

echo "TEST $TEST_NAME passed."


12 changes: 12 additions & 0 deletions tests/br_split_region_fail/workload
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
recordcount=1000
operationcount=0
workload=core

readallfields=true

readproportion=0
updateproportion=0
scanproportion=0
insertproportion=0

requestdistribution=uniform
5 changes: 4 additions & 1 deletion tools/Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
all: bin/goimports bin/govet bin/revive bin/overalls bin/golangci-lint
all: bin/goimports bin/govet bin/revive bin/overalls bin/golangci-lint bin/failpoint-ctl

bin/goimports:
go build -o $@ golang.org/x/tools/cmd/goimports
Expand All @@ -14,3 +14,6 @@ bin/overalls:

bin/golangci-lint:
go build -o $@ github.com/golangci/golangci-lint/cmd/golangci-lint

bin/failpoint-ctl:
go build -o $@ github.com/pingcap/failpoint/failpoint-ctl
1 change: 1 addition & 0 deletions tools/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/go-playground/overalls v0.0.0-20191218162659-7df9f728c018
github.com/golangci/golangci-lint v1.26.0
github.com/mgechev/revive v1.0.2
github.com/pingcap/failpoint v0.0.0-20200506114213-c17f16071c53
github.com/yookoala/realpath v1.0.0 // indirect
golang.org/x/tools v0.0.0-20200422205258-72e4a01eba43
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
Expand Down
Loading