Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

restore: merge tidb-tools/pkg/restore-util #146

Merged
merged 18 commits into from
Jan 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ issues:
text: "Potential HTTP request made with variable url"
linters:
- gosec
- path: .go
text: "Use of weak random number generator"
# TODO Remove it.
- path: split_client.go
text: "SA1019:"
linters:
- gosec
- staticcheck
9 changes: 4 additions & 5 deletions cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/pd/pkg/mock/mockid"
restore_util "github.com/pingcap/tidb-tools/pkg/restore-util"
"github.com/spf13/cobra"
"go.uber.org/zap"

Expand Down Expand Up @@ -187,15 +186,15 @@ func newBackupMetaCommand() *cobra.Command {
tables = append(tables, db.Tables...)
}
// Check if the ranges of files overlapped
rangeTree := restore_util.NewRangeTree()
rangeTree := restore.NewRangeTree()
for _, file := range files {
if out := rangeTree.InsertRange(restore_util.Range{
if out := rangeTree.InsertRange(restore.Range{
StartKey: file.GetStartKey(),
EndKey: file.GetEndKey(),
}); out != nil {
log.Error(
"file ranges overlapped",
zap.Stringer("out", out.(*restore_util.Range)),
zap.Stringer("out", out.(*restore.Range)),
zap.Stringer("file", file),
)
}
Expand All @@ -206,7 +205,7 @@ func newBackupMetaCommand() *cobra.Command {
for offset := uint64(0); offset < tableIDOffset; offset++ {
_, _ = tableIDAllocator.Alloc() // Ignore error
}
rewriteRules := &restore_util.RewriteRules{
rewriteRules := &restore.RewriteRules{
Table: make([]*import_sstpb.RewriteRule, 0),
Data: make([]*import_sstpb.RewriteRule, 0),
}
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ require (
github.com/pingcap/parser v0.0.0-20191210060830-bdf23a7ade01
github.com/pingcap/pd v1.1.0-beta.0.20191212045800-234784c7a9c5
github.com/pingcap/tidb v1.1.0-beta.0.20191213040028-9009da737834
github.com/pingcap/tidb-tools v3.1.0-beta.0.20191223064326-e9c7a23a8dcb+incompatible
github.com/pingcap/tipb v0.0.0-20191209145133-44f75c9bef33
github.com/prometheus/client_golang v1.0.0
github.com/sirupsen/logrus v1.4.2
Expand Down
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,8 @@ github.com/pingcap/sysutil v0.0.0-20191126040022-986c5b3ed9a3 h1:HCNif3lukL83gNC
github.com/pingcap/sysutil v0.0.0-20191126040022-986c5b3ed9a3/go.mod h1:Futrrmuw98pEsbEmoPsjw8aKLCmixwHEmT2rF+AsXGw=
github.com/pingcap/tidb v1.1.0-beta.0.20191213040028-9009da737834 h1:eNf7bDY39moIzzcs5+PhLLW0BM2D2yrzFbjW/X42y0s=
github.com/pingcap/tidb v1.1.0-beta.0.20191213040028-9009da737834/go.mod h1:VWx47QOXISBHHtZeWrDQlBOdbvth9TE9gei6QpoqJ4g=
github.com/pingcap/tidb-tools v3.0.6-0.20191106033616-90632dda3863+incompatible h1:H1jg0aDWz2SLRh3hNBo2HFtnuHtudIUvBumU7syRkic=
github.com/pingcap/tidb-tools v3.0.6-0.20191106033616-90632dda3863+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v3.1.0-beta.0.20191223064326-e9c7a23a8dcb+incompatible h1:GxWxXVqA2aAZIgS+bEpasJkkspu9Jom1/oB2NmP7t/o=
github.com/pingcap/tidb-tools v3.1.0-beta.0.20191223064326-e9c7a23a8dcb+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20191209145133-44f75c9bef33 h1:cTSaVv1hue17BCPqt+sURADTFSMpSD26ZuvKRyYIjJs=
github.com/pingcap/tipb v0.0.0-20191209145133-44f75c9bef33/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
13 changes: 6 additions & 7 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
pd "github.com/pingcap/pd/client"
restore_util "github.com/pingcap/tidb-tools/pkg/restore-util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -108,7 +107,7 @@ func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, backend *backup.
rc.databases = databases
rc.backupMeta = backupMeta

metaClient := restore_util.NewClient(rc.pdClient)
metaClient := NewSplitClient(rc.pdClient)
importClient := NewImportClient(metaClient)
rc.fileImporter = NewFileImporter(rc.ctx, metaClient, importClient, backend, rc.rateLimit)
return nil
Expand Down Expand Up @@ -189,8 +188,8 @@ func (rc *Client) CreateTables(
dom *domain.Domain,
tables []*utils.Table,
newTS uint64,
) (*restore_util.RewriteRules, []*model.TableInfo, error) {
rewriteRules := &restore_util.RewriteRules{
) (*RewriteRules, []*model.TableInfo, error) {
rewriteRules := &RewriteRules{
Table: make([]*import_sstpb.RewriteRule, 0),
Data: make([]*import_sstpb.RewriteRule, 0),
}
Expand Down Expand Up @@ -232,7 +231,7 @@ func (rc *Client) setSpeedLimit() error {
// RestoreTable tries to restore the data of a table.
func (rc *Client) RestoreTable(
table *utils.Table,
rewriteRules *restore_util.RewriteRules,
rewriteRules *RewriteRules,
updateCh chan<- struct{},
) (err error) {
start := time.Now()
Expand Down Expand Up @@ -300,7 +299,7 @@ func (rc *Client) RestoreTable(
// RestoreDatabase tries to restore the data of a database
func (rc *Client) RestoreDatabase(
db *utils.Database,
rewriteRules *restore_util.RewriteRules,
rewriteRules *RewriteRules,
updateCh chan<- struct{},
) (err error) {
start := time.Now()
Expand Down Expand Up @@ -336,7 +335,7 @@ func (rc *Client) RestoreDatabase(

// RestoreAll tries to restore all the data of backup files.
func (rc *Client) RestoreAll(
rewriteRules *restore_util.RewriteRules,
rewriteRules *RewriteRules,
updateCh chan<- struct{},
) (err error) {
start := time.Now()
Expand Down
33 changes: 16 additions & 17 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/pd/pkg/codec"
restore_util "github.com/pingcap/tidb-tools/pkg/restore-util"
"go.uber.org/zap"
"google.golang.org/grpc"

Expand Down Expand Up @@ -60,12 +59,12 @@ type ImporterClient interface {

type importClient struct {
mu sync.Mutex
metaClient restore_util.Client
metaClient SplitClient
clients map[uint64]import_sstpb.ImportSSTClient
}

// NewImportClient returns a new ImporterClient
func NewImportClient(metaClient restore_util.Client) ImporterClient {
func NewImportClient(metaClient SplitClient) ImporterClient {
return &importClient{
metaClient: metaClient,
clients: make(map[uint64]import_sstpb.ImportSSTClient),
Expand Down Expand Up @@ -133,7 +132,7 @@ func (ic *importClient) getImportClient(

// FileImporter used to import a file to TiKV.
type FileImporter struct {
metaClient restore_util.Client
metaClient SplitClient
importClient ImporterClient
backend *backup.StorageBackend
rateLimit uint64
Expand All @@ -145,7 +144,7 @@ type FileImporter struct {
// NewFileImporter returns a new file importClient.
func NewFileImporter(
ctx context.Context,
metaClient restore_util.Client,
metaClient SplitClient,
importClient ImporterClient,
backend *backup.StorageBackend,
rateLimit uint64,
Expand All @@ -163,7 +162,7 @@ func NewFileImporter(

// Import tries to import a file.
// All rules must contain encoded keys.
func (importer *FileImporter) Import(file *backup.File, rewriteRules *restore_util.RewriteRules) error {
func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRules) error {
log.Debug("import file", zap.Stringer("file", file))
// Rewrite the start key and end key of file to scan regions
startKey, endKey, err := rewriteFileKeys(file, rewriteRules)
Expand All @@ -179,9 +178,9 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *restore_ut
ctx, cancel := context.WithTimeout(importer.ctx, importScanResgionTime)
defer cancel()
// Scan regions covered by the file range
regionInfos, err := importer.metaClient.ScanRegions(ctx, startKey, endKey, 0)
if err != nil {
return errors.Trace(err)
regionInfos, err1 := importer.metaClient.ScanRegions(ctx, startKey, endKey, 0)
if err1 != nil {
return errors.Trace(err1)
}
log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos)))
// Try to download and ingest the file in every region
Expand All @@ -190,20 +189,20 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *restore_ut
info := regionInfo
// Try to download file.
err = withRetry(func() error {
var err error
var err2 error
var isEmpty bool
downloadMeta, isEmpty, err = importer.downloadSST(info, file, rewriteRules)
if err != nil {
downloadMeta, isEmpty, err2 = importer.downloadSST(info, file, rewriteRules)
if err2 != nil {
if err != errRewriteRuleNotFound {
log.Warn("download file failed",
zap.Stringer("file", file),
zap.Stringer("region", info.Region),
zap.Binary("startKey", startKey),
zap.Binary("endKey", endKey),
zap.Error(err),
zap.Error(err2),
)
}
return err
return err2
}
if isEmpty {
log.Info(
Expand Down Expand Up @@ -255,9 +254,9 @@ func (importer *FileImporter) setDownloadSpeedLimit(storeID uint64) error {
}

func (importer *FileImporter) downloadSST(
regionInfo *restore_util.RegionInfo,
regionInfo *RegionInfo,
file *backup.File,
rewriteRules *restore_util.RewriteRules,
rewriteRules *RewriteRules,
) (*import_sstpb.SSTMeta, bool, error) {
id, err := uuid.New().MarshalBinary()
if err != nil {
Expand Down Expand Up @@ -312,7 +311,7 @@ func (importer *FileImporter) downloadSST(

func (importer *FileImporter) ingestSST(
sstMeta *import_sstpb.SSTMeta,
regionInfo *restore_util.RegionInfo,
regionInfo *RegionInfo,
) error {
leader := regionInfo.Leader
if leader == nil {
Expand Down
148 changes: 148 additions & 0 deletions pkg/restore/range.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package restore

import (
"bytes"
"fmt"

"github.com/google/btree"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/tablecodec"
"go.uber.org/zap"
)

// Range represents a range of keys.
type Range struct {
StartKey []byte
EndKey []byte
}

// String formats a range to a string
func (r *Range) String() string {
return fmt.Sprintf("[%x %x]", r.StartKey, r.EndKey)
}

// Less compares a range with a btree.Item
func (r *Range) Less(than btree.Item) bool {
t := than.(*Range)
return len(r.EndKey) != 0 && bytes.Compare(r.EndKey, t.StartKey) <= 0
}

// contains returns if a key is included in the range.
func (r *Range) contains(key []byte) bool {
start, end := r.StartKey, r.EndKey
return bytes.Compare(key, start) >= 0 &&
(len(end) == 0 || bytes.Compare(key, end) < 0)
}

// sortRanges checks if the range overlapped and sort them
func sortRanges(ranges []Range, rewriteRules *RewriteRules) ([]Range, error) {
rangeTree := NewRangeTree()
for _, rg := range ranges {
if rewriteRules != nil {
startID := tablecodec.DecodeTableID(rg.StartKey)
endID := tablecodec.DecodeTableID(rg.EndKey)
var rule *import_sstpb.RewriteRule
if startID == endID {
rg.StartKey, rule = replacePrefix(rg.StartKey, rewriteRules)
if rule == nil {
log.Warn("cannot find rewrite rule", zap.Binary("key", rg.StartKey))
} else {
log.Debug(
"rewrite start key",
zap.Binary("key", rg.StartKey),
zap.Stringer("rule", rule))
}
rg.EndKey, rule = replacePrefix(rg.EndKey, rewriteRules)
if rule == nil {
log.Warn("cannot find rewrite rule", zap.Binary("key", rg.EndKey))
} else {
log.Debug(
"rewrite end key",
zap.Binary("key", rg.EndKey),
zap.Stringer("rule", rule))
}
} else {
log.Warn("table id does not match",
zap.Binary("startKey", rg.StartKey),
zap.Binary("endKey", rg.EndKey),
zap.Int64("startID", startID),
zap.Int64("endID", endID))
return nil, errors.New("table id does not match")
}
}
if out := rangeTree.InsertRange(rg); out != nil {
return nil, errors.Errorf("ranges overlapped: %s, %s", out, rg)
}
}
sortedRanges := make([]Range, 0, len(ranges))
rangeTree.Ascend(func(rg *Range) bool {
if rg == nil {
return false
}
sortedRanges = append(sortedRanges, *rg)
return true
})
return sortedRanges, nil
}

// RangeTree stores the ranges in an orderly manner.
// All the ranges it stored do not overlap.
type RangeTree struct {
tree *btree.BTree
}

// NewRangeTree returns a new RangeTree.
func NewRangeTree() *RangeTree {
return &RangeTree{tree: btree.New(32)}
}

// Find returns nil or a range in the range tree
func (rt *RangeTree) Find(key []byte) *Range {
var ret *Range
r := &Range{
StartKey: key,
}
rt.tree.DescendLessOrEqual(r, func(i btree.Item) bool {
ret = i.(*Range)
return false
})
if ret == nil || !ret.contains(key) {
return nil
}
return ret
}

// InsertRange inserts ranges into the range tree.
// it returns true if all ranges inserted successfully.
// it returns false if there are some overlapped ranges.
func (rt *RangeTree) InsertRange(rg Range) btree.Item {
return rt.tree.ReplaceOrInsert(&rg)
}

// RangeIterator allows callers of Ascend to iterate in-order over portions of
// the tree. When this function returns false, iteration will stop and the
// associated Ascend function will immediately return.
type RangeIterator func(rg *Range) bool

// Ascend calls the iterator for every value in the tree within [first, last],
// until the iterator returns false.
func (rt *RangeTree) Ascend(iterator RangeIterator) {
rt.tree.Ascend(func(i btree.Item) bool {
return iterator(i.(*Range))
})
}

// RegionInfo includes a region and the leader of the region.
type RegionInfo struct {
Region *metapb.Region
Leader *metapb.Peer
}

// RewriteRules contains rules for rewriting keys of tables.
type RewriteRules struct {
Table []*import_sstpb.RewriteRule
Data []*import_sstpb.RewriteRule
}
Loading