Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

move waiting reject stores in import file #222

Merged
merged 6 commits into from
Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,15 +347,18 @@ func (rc *Client) CreateTables(

// RemoveTiFlashReplica removes all the tiflash replicas of a table
// TODO: remove this after tiflash supports restore
func (rc *Client) RemoveTiFlashReplica(tables []*utils.Table, placementRules []placement.Rule) error {
func (rc *Client) RemoveTiFlashReplica(
tables []*utils.Table, newTables []*model.TableInfo, placementRules []placement.Rule) error {
schemas := make([]*backup.Schema, 0, len(tables))
var updateReplica bool
for _, table := range tables {
if rule := utils.SearchPlacementRule(table.Info.ID, placementRules, placement.Learner); rule != nil {
// must use new table id to search placement rules
// here newTables and tables must have same order
for i, table := range tables {
if rule := utils.SearchPlacementRule(newTables[i].ID, placementRules, placement.Learner); rule != nil {
table.TiFlashReplicas = rule.Count
updateReplica = true
}
tableData, err := json.Marshal(table.Info)
tableData, err := json.Marshal(newTables[i])
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -454,6 +457,7 @@ func (rc *Client) setSpeedLimit() error {
func (rc *Client) RestoreFiles(
files []*backup.File,
rewriteRules *RewriteRules,
rejectStoreMap map[uint64]bool,
updateCh glue.Progress,
) (err error) {
start := time.Now()
Expand Down Expand Up @@ -486,7 +490,7 @@ func (rc *Client) RestoreFiles(
select {
case <-rc.ctx.Done():
errCh <- rc.ctx.Err()
case errCh <- rc.fileImporter.Import(fileReplica, rewriteRules):
case errCh <- rc.fileImporter.Import(fileReplica, rejectStoreMap, rewriteRules):
updateCh.Inc()
}
})
Expand Down Expand Up @@ -537,7 +541,7 @@ func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.Fil
select {
case <-rc.ctx.Done():
errCh <- rc.ctx.Err()
case errCh <- rc.fileImporter.Import(fileReplica, emptyRules):
case errCh <- rc.fileImporter.Import(fileReplica, nil, emptyRules):
updateCh.Inc()
}
})
Expand Down
4 changes: 4 additions & 0 deletions pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) {
}
rules, newTables, err := client.CreateTables(s.mock.Domain, tables, 0)
c.Assert(err, IsNil)
// make sure tables and newTables have same order
for i, t := range tables {
c.Assert(newTables[i].Name, Equals, t.Info.Name)
}
for _, nt := range newTables {
c.Assert(nt.Name.String(), Matches, "test[0-3]")
}
Expand Down
26 changes: 25 additions & 1 deletion pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ func (importer *FileImporter) SetRawRange(startKey, endKey []byte) error {

// Import tries to import a file.
// All rules must contain encoded keys.
func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRules) error {
func (importer *FileImporter) Import(
file *backup.File,
rejectStoreMap map[uint64]bool,
rewriteRules *RewriteRules,
) error {
log.Debug("import file", zap.Stringer("file", file))
// Rewrite the start key and end key of file to scan regions
var startKey, endKey []byte
Expand All @@ -193,6 +197,9 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul
zap.Stringer("file", file),
zap.Binary("startKey", startKey),
zap.Binary("endKey", endKey))

needReject := len(rejectStoreMap) > 0

err = utils.WithRetry(importer.ctx, func() error {
ctx, cancel := context.WithTimeout(importer.ctx, importScanRegionTime)
defer cancel()
Expand All @@ -202,6 +209,23 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul
if errScanRegion != nil {
return errors.Trace(errScanRegion)
}

if needReject {
// TODO remove when TiFlash support restore
startTime := time.Now()
log.Info("start to wait for removing rejected stores", zap.Reflect("rejectStores", rejectStoreMap))
for _, region := range regionInfos {
if !waitForRemoveRejectStores(ctx, importer.metaClient, region, rejectStoreMap) {
log.Error("waiting for removing rejected stores failed",
zap.Stringer("region", region.Region))
return errors.New("waiting for removing rejected stores failed")
}
}
log.Info("waiting for removing rejected stores done",
zap.Int("regions", len(regionInfos)), zap.Duration("take", time.Since(startTime)))
needReject = false
}

log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos)))
// Try to download and ingest the file in every region
for _, regionInfo := range regionInfos {
Expand Down
70 changes: 0 additions & 70 deletions pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func (rs *RegionSplitter) Split(
ctx context.Context,
ranges []rtree.Range,
rewriteRules *RewriteRules,
rejectStores map[uint64]bool,
onSplit OnSplitFunc,
) error {
if len(ranges) == 0 {
Expand Down Expand Up @@ -95,14 +94,12 @@ func (rs *RegionSplitter) Split(
}
interval := SplitRetryInterval
scatterRegions := make([]*RegionInfo, 0)
allRegions := make([]*RegionInfo, 0)
SplitRegions:
for i := 0; i < SplitRetryTimes; i++ {
regions, errScan := paginateScanRegion(ctx, rs.client, minKey, maxKey, scanRegionPaginationLimit)
if errScan != nil {
return errors.Trace(errScan)
}
allRegions = append(allRegions, regions...)
if len(regions) == 0 {
log.Warn("cannot scan any region")
return nil
Expand Down Expand Up @@ -145,19 +142,6 @@ SplitRegions:
if errSplit != nil {
return errors.Trace(errSplit)
}
if len(rejectStores) > 0 {
startTime = time.Now()
log.Info("start to wait for removing rejected stores", zap.Reflect("rejectStores", rejectStores))
for _, region := range allRegions {
if !rs.waitForRemoveRejectStores(ctx, region, rejectStores) {
log.Error("waiting for removing rejected stores failed",
zap.Stringer("region", region.Region))
return errors.New("waiting for removing rejected stores failed")
}
}
log.Info("waiting for removing rejected stores done",
zap.Int("regions", len(allRegions)), zap.Duration("take", time.Since(startTime)))
}
log.Info("start to wait for scattering regions",
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime)))
startTime = time.Now()
Expand Down Expand Up @@ -211,30 +195,6 @@ func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID
return ok, nil
}

func (rs *RegionSplitter) hasRejectStorePeer(
ctx context.Context,
regionID uint64,
rejectStores map[uint64]bool,
) (bool, error) {
regionInfo, err := rs.client.GetRegionByID(ctx, regionID)
if err != nil {
return false, err
}
if regionInfo == nil {
return false, nil
}
for _, peer := range regionInfo.Region.GetPeers() {
if rejectStores[peer.GetStoreId()] {
return true, nil
}
}
retryTimes := ctx.Value(retryTimes).(int)
if retryTimes > 10 {
log.Warn("get region info", zap.Stringer("region", regionInfo.Region))
}
return false, nil
}

func (rs *RegionSplitter) waitForSplit(ctx context.Context, regionID uint64) {
interval := SplitCheckInterval
for i := 0; i < SplitCheckMaxRetryTimes; i++ {
Expand Down Expand Up @@ -280,36 +240,6 @@ func (rs *RegionSplitter) waitForScatterRegion(ctx context.Context, regionInfo *
}
}

func (rs *RegionSplitter) waitForRemoveRejectStores(
ctx context.Context,
regionInfo *RegionInfo,
rejectStores map[uint64]bool,
) bool {
interval := RejectStoreCheckInterval
regionID := regionInfo.Region.GetId()
for i := 0; i < RejectStoreCheckRetryTimes; i++ {
ctx1 := context.WithValue(ctx, retryTimes, i)
ok, err := rs.hasRejectStorePeer(ctx1, regionID, rejectStores)
if err != nil {
log.Warn("wait for rejecting store failed",
zap.Stringer("region", regionInfo.Region),
zap.Error(err))
return false
}
// Do not have any peer in the rejected store, return true
if !ok {
return true
}
interval = 2 * interval
if interval > RejectStoreMaxCheckInterval {
interval = RejectStoreMaxCheckInterval
}
time.Sleep(interval)
}

return false
}

func (rs *RegionSplitter) splitAndScatterRegions(
ctx context.Context, regionInfo *RegionInfo, keys [][]byte,
) ([]*RegionInfo, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *testRestoreUtilSuite) TestSplit(c *C) {
regionSplitter := NewRegionSplitter(client)

ctx := context.Background()
err := regionSplitter.Split(ctx, ranges, rewriteRules, map[uint64]bool{}, func(key [][]byte) {})
err := regionSplitter.Split(ctx, ranges, rewriteRules, func(key [][]byte) {})
if err != nil {
c.Assert(err, IsNil, Commentf("split regions failed: %v", err))
}
Expand Down
67 changes: 57 additions & 10 deletions pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/tidb/util/codec"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/summary"
Expand Down Expand Up @@ -332,16 +331,8 @@ func SplitRanges(
summary.CollectDuration("split region", elapsed)
}()
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig()))
tiflashStores, err := conn.GetAllTiKVStores(ctx, client.GetPDClient(), conn.TiFlashOnly)
if err != nil {
return errors.Trace(err)
}
storeMap := make(map[uint64]bool)
for _, store := range tiflashStores {
storeMap[store.GetId()] = true
}

return splitter.Split(ctx, ranges, rewriteRules, storeMap, func(keys [][]byte) {
return splitter.Split(ctx, ranges, rewriteRules, func(keys [][]byte) {
for range keys {
updateCh.Inc()
}
Expand Down Expand Up @@ -416,3 +407,59 @@ func paginateScanRegion(
}
return regions, nil
}

func hasRejectStorePeer(
ctx context.Context,
client SplitClient,
regionID uint64,
rejectStores map[uint64]bool,
) (bool, error) {
regionInfo, err := client.GetRegionByID(ctx, regionID)
if err != nil {
return false, err
}
if regionInfo == nil {
return false, nil
}
for _, peer := range regionInfo.Region.GetPeers() {
if rejectStores[peer.GetStoreId()] {
return true, nil
}
}
retryTimes := ctx.Value(retryTimes).(int)
if retryTimes > 10 {
log.Warn("get region info", zap.Stringer("region", regionInfo.Region))
}
return false, nil
}

func waitForRemoveRejectStores(
ctx context.Context,
client SplitClient,
regionInfo *RegionInfo,
rejectStores map[uint64]bool,
) bool {
interval := RejectStoreCheckInterval
regionID := regionInfo.Region.GetId()
for i := 0; i < RejectStoreCheckRetryTimes; i++ {
ctx1 := context.WithValue(ctx, retryTimes, i)
ok, err := hasRejectStorePeer(ctx1, client, regionID, rejectStores)
if err != nil {
log.Warn("wait for rejecting store failed",
zap.Stringer("region", regionInfo.Region),
zap.Error(err))
return false
}
// Do not have any peer in the rejected store, return true
if !ok {
return true
}
interval = 2 * interval
if interval > RejectStoreMaxCheckInterval {
interval = RejectStoreMaxCheckInterval
}
time.Sleep(interval)
}

return false
}
15 changes: 13 additions & 2 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if err != nil {
return err
}
err = client.RemoveTiFlashReplica(tables, placementRules)

err = client.RemoveTiFlashReplica(tables, newTables, placementRules)
if err != nil {
return err
}
Expand Down Expand Up @@ -222,6 +223,16 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if batchSize > maxRestoreBatchSizeLimit {
batchSize = maxRestoreBatchSizeLimit // 256
}

tiflashStores, err := conn.GetAllTiKVStores(ctx, client.GetPDClient(), conn.TiFlashOnly)
if err != nil {
return errors.Trace(err)
}
rejectStoreMap := make(map[uint64]bool)
for _, store := range tiflashStores {
rejectStoreMap[store.GetId()] = true
}

for {
if len(ranges) == 0 {
break
Expand All @@ -246,7 +257,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
}

// After split, we can restore backup files.
err = client.RestoreFiles(fileBatch, rewriteRules, updateCh)
err = client.RestoreFiles(fileBatch, rewriteRules, rejectStoreMap, updateCh)
if err != nil {
break
}
Expand Down