Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
Incremental BR: support DDL (#155)
Browse files Browse the repository at this point in the history
* support backup&restore ddl

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* integration tests

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* update kvproto

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* fix integration tests

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* reduce cyclomatic complexity of `runRestore`

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* fix test

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* add unit test

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* fix tests

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* disable fast checksum in incremental br

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* fix no valid key error

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* address lint

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* address comments

Signed-off-by: 5kbpers <tangminghua@pingcap.com>
  • Loading branch information
5kbpers authored Feb 20, 2020
1 parent 008ec45 commit 1e1fc97
Show file tree
Hide file tree
Showing 21 changed files with 527 additions and 90 deletions.
10 changes: 5 additions & 5 deletions cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,19 +196,19 @@ func newBackupMetaCommand() *cobra.Command {
newTable := new(model.TableInfo)
tableID, _ := tableIDAllocator.Alloc()
newTable.ID = int64(tableID)
newTable.Name = table.Schema.Name
newTable.Indices = make([]*model.IndexInfo, len(table.Schema.Indices))
for i, indexInfo := range table.Schema.Indices {
newTable.Name = table.Info.Name
newTable.Indices = make([]*model.IndexInfo, len(table.Info.Indices))
for i, indexInfo := range table.Info.Indices {
indexID, _ := indexIDAllocator.Alloc()
newTable.Indices[i] = &model.IndexInfo{
ID: int64(indexID),
Name: indexInfo.Name,
}
}
rules := restore.GetRewriteRules(newTable, table.Schema, 0)
rules := restore.GetRewriteRules(newTable, table.Info, 0)
rewriteRules.Table = append(rewriteRules.Table, rules.Table...)
rewriteRules.Data = append(rewriteRules.Data, rules.Data...)
tableIDMap[table.Schema.ID] = int64(tableID)
tableIDMap[table.Info.ID] = int64(tableID)
}
// Validate rewrite rules
for _, file := range files {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/onsi/gomega v1.7.1 // indirect
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/kvproto v0.0.0-20200108025604-a4dc183d2af5
github.com/pingcap/kvproto v0.0.0-20200210234432-a965739f8162
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20200109073933-a9496438d77d
github.com/pingcap/pd v1.1.0-beta.0.20191219054547-4d65bbefbc6d
Expand Down
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdc
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/aws/aws-sdk-go v1.26.1 h1:JGQggXhOiNJIqsmbYUl3cYtJZUffeOWlHtxfzGK7WPI=
github.com/aws/aws-sdk-go v1.26.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
Expand Down Expand Up @@ -182,6 +183,8 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf
github.com/grpc-ecosystem/grpc-gateway v1.4.1/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/grpc-ecosystem/grpc-gateway v1.9.5 h1:UImYN5qQ8tuGpGE16ZmjvcTtTw24zw1QAp/SlnNrZhI=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.12.1 h1:zCy2xE9ablevUOrUZc3Dl72Dt+ya2FNAvC2yLYMHzi4=
github.com/grpc-ecosystem/grpc-gateway v1.12.1/go.mod h1:8XEsbTttt/W+VvjtQhLACqCisSPWTxCZ7sBRjU6iH9c=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
Expand Down Expand Up @@ -300,6 +303,8 @@ github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO
github.com/pingcap/kvproto v0.0.0-20191213111810-93cb7c623c8b/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200108025604-a4dc183d2af5 h1:RUxQExD5yubAjWGnw8kcxfO9abbiVHIE1rbuCyQCWDE=
github.com/pingcap/kvproto v0.0.0-20200108025604-a4dc183d2af5/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200210234432-a965739f8162 h1:lsoIoCoXMpcHvW6jHcqP/prA4I6duAp1DVyG2ULz4bM=
github.com/pingcap/kvproto v0.0.0-20200210234432-a965739f8162/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/parser v0.0.0-20200109073933-a9496438d77d h1:4QwSJRxmBjTB9ssJNWg2f2bDm5rqnHCUUjMh4N1QOOY=
Expand Down Expand Up @@ -342,6 +347,7 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237 h1:HQagqIiBmr8YXawX/le3+O26N+vPPC1PtjaF3mwnook=
github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
Expand Down Expand Up @@ -496,6 +502,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b h1:XfVGCX+0T4WOStkaOsJRllbsiImhB2jgVBGc9L0lPGc=
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191002035440-2ec189313ef0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191011234655-491137f69257 h1:ry8e2D+cwaV6hk7lb3aRTjjZo24shrbK0e11QEOkTIg=
golang.org/x/net v0.0.0-20191011234655-491137f69257/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down Expand Up @@ -563,6 +570,7 @@ golang.org/x/tools v0.0.0-20191107010934-f79515f33823 h1:akkRBeitX2EZP59KdtKw310
golang.org/x/tools v0.0.0-20191107010934-f79515f33823/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2 h1:EtTFh6h4SAKemS+CURDMTDIANuduG5zKEXShyy18bGA=
golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f h1:kDxGY2VmgABOe55qheT/TFqUMtcTHnomIPS1iv3G4Ms=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200107184032-11e9d9cc0042 h1:BKiPVwWbEdmAh+5CBwk13CYeVJQRDJpDnKgDyMOGz9M=
golang.org/x/tools v0.0.0-20200107184032-11e9d9cc0042/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
Expand Down Expand Up @@ -596,6 +604,7 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98
google.golang.org/genproto v0.0.0-20190905072037-92dd089d5514 h1:oFSK4421fpCKRrpzIpybyBVWyht05NegY9+L/3TLAZs=
google.golang.org/genproto v0.0.0-20190905072037-92dd089d5514/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
google.golang.org/genproto v0.0.0-20191115194625-c23dd37a84c9 h1:6XzpBoANz1NqMNfDXzc2QmHmbb1vyMsvRfoP5rM+K1I=
google.golang.org/genproto v0.0.0-20191115194625-c23dd37a84c9/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
Expand All @@ -605,6 +614,7 @@ google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiq
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA=
google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
Expand All @@ -629,6 +639,7 @@ gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bl
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
55 changes: 53 additions & 2 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -119,15 +120,20 @@ func (bc *Client) SetStorage(ctx context.Context, backend *backup.StorageBackend
}

// SaveBackupMeta saves the current backup meta at the given path.
func (bc *Client) SaveBackupMeta(ctx context.Context) error {
func (bc *Client) SaveBackupMeta(ctx context.Context, ddlJobs []*model.Job) error {
ddlJobsData, err := json.Marshal(ddlJobs)
if err != nil {
return errors.Trace(err)
}
bc.backupMeta.Ddls = ddlJobsData
backupMetaData, err := proto.Marshal(&bc.backupMeta)
if err != nil {
return errors.Trace(err)
}
log.Debug("backup meta",
zap.Reflect("meta", bc.backupMeta))
backendURL := storage.FormatBackendURL(bc.backend)
log.Info("save backup meta", zap.Stringer("path", &backendURL))
log.Info("save backup meta", zap.Stringer("path", &backendURL), zap.Int("jobs", len(ddlJobs)))
return bc.storage.Write(ctx, utils.MetaFile, backupMetaData)
}

Expand Down Expand Up @@ -241,6 +247,51 @@ func BuildBackupRangeAndSchema(
return ranges, backupSchemas, nil
}

// GetBackupDDLJobs returns the ddl jobs are done in (lastBackupTS, backupTS]
func GetBackupDDLJobs(dom *domain.Domain, lastBackupTS, backupTS uint64) ([]*model.Job, error) {
snapMeta, err := dom.GetSnapshotMeta(backupTS)
if err != nil {
return nil, errors.Trace(err)
}
lastSnapMeta, err := dom.GetSnapshotMeta(lastBackupTS)
if err != nil {
return nil, errors.Trace(err)
}
lastSchemaVersion, err := lastSnapMeta.GetSchemaVersion()
if err != nil {
return nil, errors.Trace(err)
}
allJobs := make([]*model.Job, 0)
defaultJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.DefaultJobListKey)
if err != nil {
return nil, errors.Trace(err)
}
log.Debug("get default jobs", zap.Int("jobs", len(defaultJobs)))
allJobs = append(allJobs, defaultJobs...)
addIndexJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.AddIndexJobListKey)
if err != nil {
return nil, errors.Trace(err)
}
log.Debug("get add index jobs", zap.Int("jobs", len(addIndexJobs)))
allJobs = append(allJobs, addIndexJobs...)
historyJobs, err := snapMeta.GetAllHistoryDDLJobs()
if err != nil {
return nil, errors.Trace(err)
}
log.Debug("get history jobs", zap.Int("jobs", len(historyJobs)))
allJobs = append(allJobs, historyJobs...)

completedJobs := make([]*model.Job, 0)
for _, job := range allJobs {
if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion) {
completedJobs = append(completedJobs, job)
}
}
log.Debug("get completed jobs", zap.Int("jobs", len(completedJobs)))
return completedJobs, nil
}

// BackupRanges make a backup of the given key ranges.
func (bc *Client) BackupRanges(
ctx context.Context,
Expand Down
8 changes: 4 additions & 4 deletions pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func buildChecksumRequest(
reqs := make([]*kv.Request, 0, (len(newTable.Indices)+1)*(len(partDefs)+1))
var oldTableID int64
if oldTable != nil {
oldTableID = oldTable.Schema.ID
oldTableID = oldTable.Info.ID
}
rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS)
if err != nil {
Expand All @@ -72,7 +72,7 @@ func buildChecksumRequest(
for _, partDef := range partDefs {
var oldPartID int64
if oldTable != nil {
for _, oldPartDef := range oldTable.Schema.Partition.Definitions {
for _, oldPartDef := range oldTable.Info.Partition.Definitions {
if oldPartDef.Name == partDef.Name {
oldPartID = oldPartDef.ID
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func buildRequest(
}
var oldIndexInfo *model.IndexInfo
if oldTable != nil {
for _, oldIndex := range oldTable.Schema.Indices {
for _, oldIndex := range oldTable.Info.Indices {
if oldIndex.Name == indexInfo.Name {
oldIndexInfo = oldIndex
break
Expand All @@ -117,7 +117,7 @@ func buildRequest(
if oldIndexInfo == nil {
log.Panic("index not found",
zap.Reflect("table", tableInfo),
zap.Reflect("oldTable", oldTable.Schema),
zap.Reflect("oldTable", oldTable.Info),
zap.Stringer("index", indexInfo.Name))
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/checksum/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *testChecksumSuite) TestChecksum(c *C) {
// Test rewrite rules
tk.MustExec("alter table t1 add index i2(a);")
tableInfo1 = s.getTableInfo(c, "test", "t1")
oldTable := utils.Table{Schema: tableInfo1}
oldTable := utils.Table{Info: tableInfo1}
exe2, err = NewExecutorBuilder(tableInfo2, math.MaxUint64).
SetOldTable(&oldTable).Build()
c.Assert(err, IsNil)
Expand Down
41 changes: 38 additions & 3 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package restore

import (
"context"
"encoding/json"
"math"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -40,6 +42,7 @@ type Client struct {
tableWorkerPool *utils.WorkerPool

databases map[string]*utils.Database
ddlJobs []*model.Job
backupMeta *backup.BackupMeta
db *DB
rateLimit uint64
Expand Down Expand Up @@ -97,8 +100,15 @@ func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, backend *backup.
if err != nil {
return errors.Trace(err)
}
var ddlJobs []*model.Job
err = json.Unmarshal(backupMeta.GetDdls(), &ddlJobs)
if err != nil {
return errors.Trace(err)
}
rc.databases = databases
rc.ddlJobs = ddlJobs
rc.backupMeta = backupMeta
log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs)))

metaClient := NewSplitClient(rc.pdClient)
importClient := NewImportClient(metaClient)
Expand Down Expand Up @@ -151,6 +161,11 @@ func (rc *Client) GetDatabase(name string) *utils.Database {
return rc.databases[name]
}

// GetDDLJobs returns ddl jobs
func (rc *Client) GetDDLJobs() []*model.Job {
return rc.ddlJobs
}

// GetTableSchema returns the schema of a table from TiDB.
func (rc *Client) GetTableSchema(
dom *domain.Domain,
Expand Down Expand Up @@ -189,18 +204,38 @@ func (rc *Client) CreateTables(
if err != nil {
return nil, nil, err
}
newTableInfo, err := rc.GetTableSchema(dom, table.Db.Name, table.Schema.Name)
newTableInfo, err := rc.GetTableSchema(dom, table.Db.Name, table.Info.Name)
if err != nil {
return nil, nil, err
}
rules := GetRewriteRules(newTableInfo, table.Schema, newTS)
rules := GetRewriteRules(newTableInfo, table.Info, newTS)
rewriteRules.Table = append(rewriteRules.Table, rules.Table...)
rewriteRules.Data = append(rewriteRules.Data, rules.Data...)
newTables = append(newTables, newTableInfo)
}
return rewriteRules, newTables, nil
}

// ExecDDLs executes the queries of the ddl jobs.
func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error {
// Sort the ddl jobs by schema version in ascending order.
sort.Slice(ddlJobs, func(i, j int) bool {
return ddlJobs[i].BinlogInfo.SchemaVersion < ddlJobs[j].BinlogInfo.SchemaVersion
})

for _, job := range ddlJobs {
err := rc.db.ExecDDL(rc.ctx, job)
if err != nil {
return errors.Trace(err)
}
log.Info("execute ddl query",
zap.String("db", job.SchemaName),
zap.String("query", job.Query),
zap.Int64("historySchemaVersion", job.BinlogInfo.SchemaVersion))
}
return nil
}

func (rc *Client) setSpeedLimit() error {
if !rc.hasSpeedLimited && rc.rateLimit != 0 {
stores, err := rc.pdClient.GetAllStores(rc.ctx, pd.WithExcludeTombstone())
Expand Down Expand Up @@ -380,7 +415,7 @@ func (rc *Client) ValidateChecksum(
checksumResp.TotalBytes != table.TotalBytes {
log.Error("failed in validate checksum",
zap.String("database", table.Db.Name.L),
zap.String("table", table.Schema.Name.L),
zap.String("table", table.Info.Name.L),
zap.Uint64("origin tidb crc64", table.Crc64Xor),
zap.Uint64("calculated crc64", checksumResp.Checksum),
zap.Uint64("origin tidb total kvs", table.TotalKvs),
Expand Down
2 changes: 1 addition & 1 deletion pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) {
for i := len(tables) - 1; i >= 0; i-- {
tables[i] = &utils.Table{
Db: dbSchema,
Schema: &model.TableInfo{
Info: &model.TableInfo{
ID: int64(i),
Name: model.NewCIStr("test" + strconv.Itoa(i)),
Columns: []*model.ColumnInfo{{
Expand Down
Loading

0 comments on commit 1e1fc97

Please sign in to comment.