diff --git a/cmd/validate.go b/cmd/validate.go index 559cb9983..8bca7e553 100644 --- a/cmd/validate.go +++ b/cmd/validate.go @@ -196,19 +196,19 @@ func newBackupMetaCommand() *cobra.Command { newTable := new(model.TableInfo) tableID, _ := tableIDAllocator.Alloc() newTable.ID = int64(tableID) - newTable.Name = table.Schema.Name - newTable.Indices = make([]*model.IndexInfo, len(table.Schema.Indices)) - for i, indexInfo := range table.Schema.Indices { + newTable.Name = table.Info.Name + newTable.Indices = make([]*model.IndexInfo, len(table.Info.Indices)) + for i, indexInfo := range table.Info.Indices { indexID, _ := indexIDAllocator.Alloc() newTable.Indices[i] = &model.IndexInfo{ ID: int64(indexID), Name: indexInfo.Name, } } - rules := restore.GetRewriteRules(newTable, table.Schema, 0) + rules := restore.GetRewriteRules(newTable, table.Info, 0) rewriteRules.Table = append(rewriteRules.Table, rules.Table...) rewriteRules.Data = append(rewriteRules.Data, rules.Data...) - tableIDMap[table.Schema.ID] = int64(tableID) + tableIDMap[table.Info.ID] = int64(tableID) } // Validate rewrite rules for _, file := range files { diff --git a/go.mod b/go.mod index 180c58f93..7d9e6b77e 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/onsi/gomega v1.7.1 // indirect github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12 github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011 - github.com/pingcap/kvproto v0.0.0-20200108025604-a4dc183d2af5 + github.com/pingcap/kvproto v0.0.0-20200210234432-a965739f8162 github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 github.com/pingcap/parser v0.0.0-20200109073933-a9496438d77d github.com/pingcap/pd v1.1.0-beta.0.20191219054547-4d65bbefbc6d diff --git a/go.sum b/go.sum index 0fe4a3024..24f73f5c9 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,7 @@ github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdc github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/aws/aws-sdk-go v1.26.1 h1:JGQggXhOiNJIqsmbYUl3cYtJZUffeOWlHtxfzGK7WPI= github.com/aws/aws-sdk-go v1.26.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= @@ -182,6 +183,8 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf github.com/grpc-ecosystem/grpc-gateway v1.4.1/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= github.com/grpc-ecosystem/grpc-gateway v1.9.5 h1:UImYN5qQ8tuGpGE16ZmjvcTtTw24zw1QAp/SlnNrZhI= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= +github.com/grpc-ecosystem/grpc-gateway v1.12.1 h1:zCy2xE9ablevUOrUZc3Dl72Dt+ya2FNAvC2yLYMHzi4= +github.com/grpc-ecosystem/grpc-gateway v1.12.1/go.mod h1:8XEsbTttt/W+VvjtQhLACqCisSPWTxCZ7sBRjU6iH9c= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -300,6 +303,8 @@ github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO github.com/pingcap/kvproto v0.0.0-20191213111810-93cb7c623c8b/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200108025604-a4dc183d2af5 h1:RUxQExD5yubAjWGnw8kcxfO9abbiVHIE1rbuCyQCWDE= github.com/pingcap/kvproto v0.0.0-20200108025604-a4dc183d2af5/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/pingcap/kvproto v0.0.0-20200210234432-a965739f8162 h1:lsoIoCoXMpcHvW6jHcqP/prA4I6duAp1DVyG2ULz4bM= +github.com/pingcap/kvproto v0.0.0-20200210234432-a965739f8162/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/parser v0.0.0-20200109073933-a9496438d77d h1:4QwSJRxmBjTB9ssJNWg2f2bDm5rqnHCUUjMh4N1QOOY= @@ -342,6 +347,7 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237 h1:HQagqIiBmr8YXawX/le3+O26N+vPPC1PtjaF3mwnook= github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= +github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= @@ -496,6 +502,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190909003024-a7b16738d86b h1:XfVGCX+0T4WOStkaOsJRllbsiImhB2jgVBGc9L0lPGc= golang.org/x/net v0.0.0-20190909003024-a7b16738d86b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191002035440-2ec189313ef0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191011234655-491137f69257 h1:ry8e2D+cwaV6hk7lb3aRTjjZo24shrbK0e11QEOkTIg= golang.org/x/net v0.0.0-20191011234655-491137f69257/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -563,6 +570,7 @@ golang.org/x/tools v0.0.0-20191107010934-f79515f33823 h1:akkRBeitX2EZP59KdtKw310 golang.org/x/tools v0.0.0-20191107010934-f79515f33823/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2 h1:EtTFh6h4SAKemS+CURDMTDIANuduG5zKEXShyy18bGA= golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f h1:kDxGY2VmgABOe55qheT/TFqUMtcTHnomIPS1iv3G4Ms= golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200107184032-11e9d9cc0042 h1:BKiPVwWbEdmAh+5CBwk13CYeVJQRDJpDnKgDyMOGz9M= golang.org/x/tools v0.0.0-20200107184032-11e9d9cc0042/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= @@ -596,6 +604,7 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98 google.golang.org/genproto v0.0.0-20190905072037-92dd089d5514 h1:oFSK4421fpCKRrpzIpybyBVWyht05NegY9+L/3TLAZs= google.golang.org/genproto v0.0.0-20190905072037-92dd089d5514/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8= +google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8= google.golang.org/genproto v0.0.0-20191115194625-c23dd37a84c9 h1:6XzpBoANz1NqMNfDXzc2QmHmbb1vyMsvRfoP5rM+K1I= google.golang.org/genproto v0.0.0-20191115194625-c23dd37a84c9/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= @@ -605,6 +614,7 @@ google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiq google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA= google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= @@ -629,6 +639,7 @@ gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bl gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 49e48638d..6d6eff033 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" @@ -119,7 +120,12 @@ func (bc *Client) SetStorage(ctx context.Context, backend *backup.StorageBackend } // SaveBackupMeta saves the current backup meta at the given path. -func (bc *Client) SaveBackupMeta(ctx context.Context) error { +func (bc *Client) SaveBackupMeta(ctx context.Context, ddlJobs []*model.Job) error { + ddlJobsData, err := json.Marshal(ddlJobs) + if err != nil { + return errors.Trace(err) + } + bc.backupMeta.Ddls = ddlJobsData backupMetaData, err := proto.Marshal(&bc.backupMeta) if err != nil { return errors.Trace(err) @@ -127,7 +133,7 @@ func (bc *Client) SaveBackupMeta(ctx context.Context) error { log.Debug("backup meta", zap.Reflect("meta", bc.backupMeta)) backendURL := storage.FormatBackendURL(bc.backend) - log.Info("save backup meta", zap.Stringer("path", &backendURL)) + log.Info("save backup meta", zap.Stringer("path", &backendURL), zap.Int("jobs", len(ddlJobs))) return bc.storage.Write(ctx, utils.MetaFile, backupMetaData) } @@ -241,6 +247,51 @@ func BuildBackupRangeAndSchema( return ranges, backupSchemas, nil } +// GetBackupDDLJobs returns the ddl jobs are done in (lastBackupTS, backupTS] +func GetBackupDDLJobs(dom *domain.Domain, lastBackupTS, backupTS uint64) ([]*model.Job, error) { + snapMeta, err := dom.GetSnapshotMeta(backupTS) + if err != nil { + return nil, errors.Trace(err) + } + lastSnapMeta, err := dom.GetSnapshotMeta(lastBackupTS) + if err != nil { + return nil, errors.Trace(err) + } + lastSchemaVersion, err := lastSnapMeta.GetSchemaVersion() + if err != nil { + return nil, errors.Trace(err) + } + allJobs := make([]*model.Job, 0) + defaultJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.DefaultJobListKey) + if err != nil { + return nil, errors.Trace(err) + } + log.Debug("get default jobs", zap.Int("jobs", len(defaultJobs))) + allJobs = append(allJobs, defaultJobs...) + addIndexJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.AddIndexJobListKey) + if err != nil { + return nil, errors.Trace(err) + } + log.Debug("get add index jobs", zap.Int("jobs", len(addIndexJobs))) + allJobs = append(allJobs, addIndexJobs...) + historyJobs, err := snapMeta.GetAllHistoryDDLJobs() + if err != nil { + return nil, errors.Trace(err) + } + log.Debug("get history jobs", zap.Int("jobs", len(historyJobs))) + allJobs = append(allJobs, historyJobs...) + + completedJobs := make([]*model.Job, 0) + for _, job := range allJobs { + if (job.State == model.JobStateDone || job.State == model.JobStateSynced) && + (job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion) { + completedJobs = append(completedJobs, job) + } + } + log.Debug("get completed jobs", zap.Int("jobs", len(completedJobs))) + return completedJobs, nil +} + // BackupRanges make a backup of the given key ranges. func (bc *Client) BackupRanges( ctx context.Context, diff --git a/pkg/checksum/executor.go b/pkg/checksum/executor.go index 2ca5cf66d..30e8f11c8 100644 --- a/pkg/checksum/executor.go +++ b/pkg/checksum/executor.go @@ -61,7 +61,7 @@ func buildChecksumRequest( reqs := make([]*kv.Request, 0, (len(newTable.Indices)+1)*(len(partDefs)+1)) var oldTableID int64 if oldTable != nil { - oldTableID = oldTable.Schema.ID + oldTableID = oldTable.Info.ID } rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS) if err != nil { @@ -72,7 +72,7 @@ func buildChecksumRequest( for _, partDef := range partDefs { var oldPartID int64 if oldTable != nil { - for _, oldPartDef := range oldTable.Schema.Partition.Definitions { + for _, oldPartDef := range oldTable.Info.Partition.Definitions { if oldPartDef.Name == partDef.Name { oldPartID = oldPartDef.ID } @@ -108,7 +108,7 @@ func buildRequest( } var oldIndexInfo *model.IndexInfo if oldTable != nil { - for _, oldIndex := range oldTable.Schema.Indices { + for _, oldIndex := range oldTable.Info.Indices { if oldIndex.Name == indexInfo.Name { oldIndexInfo = oldIndex break @@ -117,7 +117,7 @@ func buildRequest( if oldIndexInfo == nil { log.Panic("index not found", zap.Reflect("table", tableInfo), - zap.Reflect("oldTable", oldTable.Schema), + zap.Reflect("oldTable", oldTable.Info), zap.Stringer("index", indexInfo.Name)) } } diff --git a/pkg/checksum/executor_test.go b/pkg/checksum/executor_test.go index ca68628e2..3e6d8078c 100644 --- a/pkg/checksum/executor_test.go +++ b/pkg/checksum/executor_test.go @@ -83,7 +83,7 @@ func (s *testChecksumSuite) TestChecksum(c *C) { // Test rewrite rules tk.MustExec("alter table t1 add index i2(a);") tableInfo1 = s.getTableInfo(c, "test", "t1") - oldTable := utils.Table{Schema: tableInfo1} + oldTable := utils.Table{Info: tableInfo1} exe2, err = NewExecutorBuilder(tableInfo2, math.MaxUint64). SetOldTable(&oldTable).Build() c.Assert(err, IsNil) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 5402d78bc..f45b3d510 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -2,7 +2,9 @@ package restore import ( "context" + "encoding/json" "math" + "sort" "sync" "time" @@ -40,6 +42,7 @@ type Client struct { tableWorkerPool *utils.WorkerPool databases map[string]*utils.Database + ddlJobs []*model.Job backupMeta *backup.BackupMeta db *DB rateLimit uint64 @@ -97,8 +100,15 @@ func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, backend *backup. if err != nil { return errors.Trace(err) } + var ddlJobs []*model.Job + err = json.Unmarshal(backupMeta.GetDdls(), &ddlJobs) + if err != nil { + return errors.Trace(err) + } rc.databases = databases + rc.ddlJobs = ddlJobs rc.backupMeta = backupMeta + log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs))) metaClient := NewSplitClient(rc.pdClient) importClient := NewImportClient(metaClient) @@ -151,6 +161,11 @@ func (rc *Client) GetDatabase(name string) *utils.Database { return rc.databases[name] } +// GetDDLJobs returns ddl jobs +func (rc *Client) GetDDLJobs() []*model.Job { + return rc.ddlJobs +} + // GetTableSchema returns the schema of a table from TiDB. func (rc *Client) GetTableSchema( dom *domain.Domain, @@ -189,11 +204,11 @@ func (rc *Client) CreateTables( if err != nil { return nil, nil, err } - newTableInfo, err := rc.GetTableSchema(dom, table.Db.Name, table.Schema.Name) + newTableInfo, err := rc.GetTableSchema(dom, table.Db.Name, table.Info.Name) if err != nil { return nil, nil, err } - rules := GetRewriteRules(newTableInfo, table.Schema, newTS) + rules := GetRewriteRules(newTableInfo, table.Info, newTS) rewriteRules.Table = append(rewriteRules.Table, rules.Table...) rewriteRules.Data = append(rewriteRules.Data, rules.Data...) newTables = append(newTables, newTableInfo) @@ -201,6 +216,26 @@ func (rc *Client) CreateTables( return rewriteRules, newTables, nil } +// ExecDDLs executes the queries of the ddl jobs. +func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error { + // Sort the ddl jobs by schema version in ascending order. + sort.Slice(ddlJobs, func(i, j int) bool { + return ddlJobs[i].BinlogInfo.SchemaVersion < ddlJobs[j].BinlogInfo.SchemaVersion + }) + + for _, job := range ddlJobs { + err := rc.db.ExecDDL(rc.ctx, job) + if err != nil { + return errors.Trace(err) + } + log.Info("execute ddl query", + zap.String("db", job.SchemaName), + zap.String("query", job.Query), + zap.Int64("historySchemaVersion", job.BinlogInfo.SchemaVersion)) + } + return nil +} + func (rc *Client) setSpeedLimit() error { if !rc.hasSpeedLimited && rc.rateLimit != 0 { stores, err := rc.pdClient.GetAllStores(rc.ctx, pd.WithExcludeTombstone()) @@ -380,7 +415,7 @@ func (rc *Client) ValidateChecksum( checksumResp.TotalBytes != table.TotalBytes { log.Error("failed in validate checksum", zap.String("database", table.Db.Name.L), - zap.String("table", table.Schema.Name.L), + zap.String("table", table.Info.Name.L), zap.Uint64("origin tidb crc64", table.Crc64Xor), zap.Uint64("calculated crc64", checksumResp.Checksum), zap.Uint64("origin tidb total kvs", table.TotalKvs), diff --git a/pkg/restore/client_test.go b/pkg/restore/client_test.go index 5007f1281..3d608b3b9 100644 --- a/pkg/restore/client_test.go +++ b/pkg/restore/client_test.go @@ -52,7 +52,7 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) { for i := len(tables) - 1; i >= 0; i-- { tables[i] = &utils.Table{ Db: dbSchema, - Schema: &model.TableInfo{ + Info: &model.TableInfo{ ID: int64(i), Name: model.NewCIStr("test" + strconv.Itoa(i)), Columns: []*model.ColumnInfo{{ diff --git a/pkg/restore/db.go b/pkg/restore/db.go index b114b7629..8c09af16f 100644 --- a/pkg/restore/db.go +++ b/pkg/restore/db.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "sort" "strings" "github.com/pingcap/errors" @@ -38,6 +39,31 @@ func NewDB(store kv.Storage) (*DB, error) { }, nil } +// ExecDDL executes the query of a ddl job. +func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { + var err error + if ddlJob.BinlogInfo.TableInfo != nil { + switchDbSQL := fmt.Sprintf("use %s;", ddlJob.SchemaName) + _, err = db.se.Execute(ctx, switchDbSQL) + if err != nil { + log.Error("switch db failed", + zap.String("query", switchDbSQL), + zap.String("db", ddlJob.SchemaName), + zap.Error(err)) + return errors.Trace(err) + } + } + _, err = db.se.Execute(ctx, ddlJob.Query) + if err != nil { + log.Error("execute ddl query failed", + zap.String("query", ddlJob.Query), + zap.String("db", ddlJob.SchemaName), + zap.Int64("historySchemaVersion", ddlJob.BinlogInfo.SchemaVersion), + zap.Error(err)) + } + return errors.Trace(err) +} + // CreateDatabase executes a CREATE DATABASE SQL. func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { var buf bytes.Buffer @@ -49,16 +75,15 @@ func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { createSQL := buf.String() _, err = db.se.Execute(ctx, createSQL) if err != nil { - log.Error("create database failed", zap.String("SQL", createSQL), zap.Error(err)) - return errors.Trace(err) + log.Error("create database failed", zap.String("query", createSQL), zap.Error(err)) } - return nil + return errors.Trace(err) } // CreateTable executes a CREATE TABLE SQL. func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { var buf bytes.Buffer - schema := table.Schema + schema := table.Info err := executor.ConstructResultOfShowCreateTable(db.se, schema, newIDAllocator(schema.AutoIncID), &buf) if err != nil { log.Error( @@ -88,7 +113,7 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { log.Error("create table failed", zap.String("SQL", createSQL), zap.Stringer("db", table.Db.Name), - zap.Stringer("table", table.Schema.Name), + zap.Stringer("table", table.Info.Name), zap.Error(err)) return errors.Trace(err) } @@ -99,16 +124,76 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { _, err = db.se.Execute(ctx, alterAutoIncIDSQL) if err != nil { log.Error("alter AutoIncID failed", - zap.String("SQL", alterAutoIncIDSQL), + zap.String("query", alterAutoIncIDSQL), zap.Stringer("db", table.Db.Name), - zap.Stringer("table", table.Schema.Name), + zap.Stringer("table", table.Info.Name), zap.Error(err)) - return errors.Trace(err) } - return nil + return errors.Trace(err) } // Close closes the connection func (db *DB) Close() { db.se.Close() } + +// FilterDDLJobs filters ddl jobs +func FilterDDLJobs(allDDLJobs []*model.Job, tables []*utils.Table) (ddlJobs []*model.Job) { + // Sort the ddl jobs by schema version in descending order. + sort.Slice(allDDLJobs, func(i, j int) bool { + return allDDLJobs[i].BinlogInfo.SchemaVersion > allDDLJobs[j].BinlogInfo.SchemaVersion + }) + dbs := getDatabases(tables) + for _, db := range dbs { + // These maps is for solving some corner case. + // e.g. let "t=2" indicates that the id of database "t" is 2, if the ddl execution sequence is: + // rename "a" to "b"(a=1) -> drop "b"(b=1) -> create "b"(b=2) -> rename "b" to "a"(a=2) + // Which we cannot find the "create" DDL by name and id directly. + // To cover †his case, we must find all names and ids the database/table ever had. + dbIDs := make(map[int64]bool) + dbIDs[db.ID] = true + dbNames := make(map[string]bool) + dbNames[db.Name.String()] = true + for _, job := range allDDLJobs { + if job.BinlogInfo.DBInfo != nil { + if dbIDs[job.SchemaID] || dbNames[job.BinlogInfo.DBInfo.Name.String()] { + ddlJobs = append(ddlJobs, job) + // The the jobs executed with the old id, like the step 2 in the example above. + dbIDs[job.SchemaID] = true + // For the jobs executed after rename, like the step 3 in the example above. + dbNames[job.BinlogInfo.DBInfo.Name.String()] = true + } + } + } + } + + for _, table := range tables { + tableIDs := make(map[int64]bool) + tableIDs[table.Info.ID] = true + tableNames := make(map[string]bool) + tableNames[table.Info.Name.String()] = true + for _, job := range allDDLJobs { + if job.BinlogInfo.TableInfo != nil { + if tableIDs[job.TableID] || tableNames[job.BinlogInfo.TableInfo.Name.String()] { + ddlJobs = append(ddlJobs, job) + tableIDs[job.TableID] = true + // For truncate table, the id may be changed + tableIDs[job.BinlogInfo.TableInfo.ID] = true + tableNames[job.BinlogInfo.TableInfo.Name.String()] = true + } + } + } + } + return ddlJobs +} + +func getDatabases(tables []*utils.Table) (dbs []*model.DBInfo) { + dbIDs := make(map[int64]bool) + for _, table := range tables { + if !dbIDs[table.Db.ID] { + dbs = append(dbs, table.Db) + dbIDs[table.Db.ID] = true + } + } + return +} diff --git a/pkg/restore/db_test.go b/pkg/restore/db_test.go index 98341f510..0151b4da6 100644 --- a/pkg/restore/db_test.go +++ b/pkg/restore/db_test.go @@ -12,6 +12,7 @@ import ( "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testleak" + "github.com/pingcap/br/pkg/backup" "github.com/pingcap/br/pkg/utils" ) @@ -25,19 +26,18 @@ func (s *testRestoreSchemaSuite) SetUpSuite(c *C) { var err error s.mock, err = utils.NewMockCluster() c.Assert(err, IsNil) + c.Assert(s.mock.Start(), IsNil) } func TestT(t *testing.T) { TestingT(t) } func (s *testRestoreSchemaSuite) TearDownSuite(c *C) { + s.mock.Stop() testleak.AfterTest(c)() } func (s *testRestoreSchemaSuite) TestRestoreAutoIncID(c *C) { - c.Assert(s.mock.Start(), IsNil) - defer s.mock.Stop() - tk := testkit.NewTestKit(c, s.mock.Storage) tk.MustExec("use test") tk.MustExec("set @@sql_mode=''") @@ -60,16 +60,16 @@ func (s *testRestoreSchemaSuite) TestRestoreAutoIncID(c *C) { tableInfo, err := info.TableByName(model.NewCIStr("test"), model.NewCIStr("\"t\"")) c.Assert(err, IsNil, Commentf("Error get table info: %s", err)) table := utils.Table{ - Schema: tableInfo.Meta(), - Db: dbInfo, + Info: tableInfo.Meta(), + Db: dbInfo, } // Get the next AutoIncID idAlloc := autoid.NewAllocator(s.mock.Storage, dbInfo.ID, false, autoid.RowIDAllocType) - globalAutoID, err := idAlloc.NextGlobalAutoID(table.Schema.ID) + globalAutoID, err := idAlloc.NextGlobalAutoID(table.Info.ID) c.Assert(err, IsNil, Commentf("Error allocate next auto id")) c.Assert(autoIncID, Equals, uint64(globalAutoID)) // Alter AutoIncID to the next AutoIncID + 100 - table.Schema.AutoIncID = globalAutoID + 100 + table.Info.AutoIncID = globalAutoID + 100 db, err := NewDB(s.mock.Storage) c.Assert(err, IsNil, Commentf("Error create DB")) tk.MustExec("drop database if exists test;") @@ -92,3 +92,39 @@ func (s *testRestoreSchemaSuite) TestRestoreAutoIncID(c *C) { c.Assert(err, IsNil, Commentf("Error query auto inc id: %s", err)) c.Assert(autoIncID, Equals, uint64(globalAutoID+100)) } + +func (s *testRestoreSchemaSuite) TestFilterDDLJobs(c *C) { + tk := testkit.NewTestKit(c, s.mock.Storage) + tk.MustExec("CREATE DATABASE IF NOT EXISTS test_db;") + tk.MustExec("CREATE TABLE IF NOT EXISTS test_db.test_table (c1 INT);") + lastTs, err := s.mock.GetOracle().GetTimestamp(context.Background()) + c.Assert(err, IsNil, Commentf("Error get last ts: %s", err)) + tk.MustExec("RENAME TABLE test_db.test_table to test_db.test_table1;") + tk.MustExec("DROP TABLE test_db.test_table1;") + tk.MustExec("DROP DATABASE test_db;") + tk.MustExec("CREATE DATABASE test_db;") + tk.MustExec("USE test_db;") + tk.MustExec("CREATE TABLE test_table1 (c2 CHAR(255));") + tk.MustExec("RENAME TABLE test_table1 to test_table;") + tk.MustExec("TRUNCATE TABLE test_table;") + + ts, err := s.mock.GetOracle().GetTimestamp(context.Background()) + c.Assert(err, IsNil, Commentf("Error get ts: %s", err)) + allDDLJobs, err := backup.GetBackupDDLJobs(s.mock.Domain, lastTs, ts) + c.Assert(err, IsNil, Commentf("Error get ddl jobs: %s", err)) + infoSchema, err := s.mock.Domain.GetSnapshotInfoSchema(ts) + c.Assert(err, IsNil, Commentf("Error get snapshot info schema: %s", err)) + dbInfo, ok := infoSchema.SchemaByName(model.NewCIStr("test_db")) + c.Assert(ok, IsTrue, Commentf("DB info not exist")) + tableInfo, err := infoSchema.TableByName(model.NewCIStr("test_db"), model.NewCIStr("test_table")) + c.Assert(err, IsNil, Commentf("Error get table info: %s", err)) + tables := []*utils.Table{{ + Db: dbInfo, + Info: tableInfo.Meta(), + }} + ddlJobs := FilterDDLJobs(allDDLJobs, tables) + for _, job := range ddlJobs { + c.Logf("get ddl job: %s", job.Query) + } + c.Assert(len(ddlJobs), Equals, 7) +} diff --git a/pkg/restore/split.go b/pkg/restore/split.go index 3248fdd0d..378e256c6 100644 --- a/pkg/restore/split.go +++ b/pkg/restore/split.go @@ -3,12 +3,14 @@ package restore import ( "bytes" "context" + "strings" "time" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "go.uber.org/zap" ) @@ -103,8 +105,18 @@ SplitRegions: } for regionID, keys := range splitKeyMap { var newRegions []*RegionInfo - newRegions, err = rs.splitAndScatterRegions(ctx, regionMap[regionID], keys) + region := regionMap[regionID] + newRegions, err = rs.splitAndScatterRegions(ctx, region, keys) if err != nil { + if strings.Contains(err.Error(), "no valid key") { + for _, key := range keys { + log.Error("no valid key", + zap.Binary("startKey", region.Region.StartKey), + zap.Binary("endKey", region.Region.EndKey), + zap.Binary("key", codec.EncodeBytes([]byte{}, key))) + } + return errors.Trace(err) + } interval = 2 * interval if interval > SplitMaxRetryInterval { interval = SplitMaxRetryInterval @@ -115,6 +127,7 @@ SplitRegions: } continue SplitRegions } + log.Debug("split regions", zap.Stringer("region", region.Region), zap.ByteStrings("keys", keys)) scatterRegions = append(scatterRegions, newRegions...) onSplit(keys) } @@ -250,7 +263,7 @@ func getSplitKeys(rewriteRules *RewriteRules, ranges []Range, regions []*RegionI checkKeys = append(checkKeys, rule.GetNewKeyPrefix()) } for _, rg := range ranges { - checkKeys = append(checkKeys, rg.EndKey) + checkKeys = append(checkKeys, truncateRowKey(rg.EndKey)) } for _, key := range checkKeys { if region := needSplit(key, regions); region != nil { @@ -259,7 +272,10 @@ func getSplitKeys(rewriteRules *RewriteRules, ranges []Range, regions []*RegionI splitKeys = make([][]byte, 0, 1) } splitKeyMap[region.Region.GetId()] = append(splitKeys, key) - log.Debug("get key for split region", zap.Binary("key", key), zap.Stringer("region", region.Region)) + log.Debug("get key for split region", + zap.Binary("key", key), + zap.Binary("startKey", region.Region.StartKey), + zap.Binary("endKey", region.Region.EndKey)) } } return splitKeyMap @@ -285,6 +301,21 @@ func needSplit(splitKey []byte, regions []*RegionInfo) *RegionInfo { return nil } +var ( + tablePrefix = []byte{'t'} + idLen = 8 + recordPrefix = []byte("_r") +) + +func truncateRowKey(key []byte) []byte { + if bytes.HasPrefix(key, tablePrefix) && + len(key) > tablecodec.RecordRowKeyLen && + bytes.HasPrefix(key[len(tablePrefix)+idLen:], recordPrefix) { + return key[:tablecodec.RecordRowKeyLen] + } + return key +} + func beforeEnd(key []byte, end []byte) bool { return bytes.Compare(key, end) < 0 || len(end) == 0 } diff --git a/pkg/restore/split_test.go b/pkg/restore/split_test.go index 509c4cfa0..3ace5b8c8 100644 --- a/pkg/restore/split_test.go +++ b/pkg/restore/split_test.go @@ -280,7 +280,7 @@ func validateRegions(regions map[uint64]*RegionInfo) bool { return false } FindRegion: - for i := 1; i < 12; i++ { + for i := 1; i < len(keys); i++ { for _, region := range regions { startKey := []byte(keys[i-1]) if len(startKey) != 0 { @@ -299,3 +299,26 @@ FindRegion: } return true } + +func (s *testRestoreUtilSuite) TestNeedSplit(c *C) { + regions := []*RegionInfo{ + { + Region: &metapb.Region{ + StartKey: codec.EncodeBytes([]byte{}, []byte("b")), + EndKey: codec.EncodeBytes([]byte{}, []byte("d")), + }, + }, + } + // Out of region + c.Assert(needSplit([]byte("a"), regions), IsNil) + // Region start key + c.Assert(needSplit([]byte("b"), regions), IsNil) + // In region + region := needSplit([]byte("c"), regions) + c.Assert(bytes.Compare(region.Region.GetStartKey(), codec.EncodeBytes([]byte{}, []byte("b"))), Equals, 0) + c.Assert(bytes.Compare(region.Region.GetEndKey(), codec.EncodeBytes([]byte{}, []byte("d"))), Equals, 0) + // Region end key + c.Assert(needSplit([]byte("d"), regions), IsNil) + // Out of region + c.Assert(needSplit([]byte("e"), regions), IsNil) +} diff --git a/pkg/task/backup.go b/pkg/task/backup.go index b9613cd56..240754517 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -6,8 +6,10 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/parser/model" "github.com/pingcap/tidb-tools/pkg/filter" "github.com/spf13/pflag" + "go.uber.org/zap" "github.com/pingcap/br/pkg/backup" "github.com/pingcap/br/pkg/storage" @@ -98,6 +100,19 @@ func RunBackup(c context.Context, cmdName string, cfg *BackupConfig) error { return err } + ddlJobs := make([]*model.Job, 0) + if cfg.LastBackupTS > 0 { + err = backup.CheckGCSafepoint(ctx, mgr.GetPDClient(), cfg.LastBackupTS) + if err != nil { + log.Error("Check gc safepoint for last backup ts failed", zap.Error(err)) + return err + } + ddlJobs, err = backup.GetBackupDDLJobs(mgr.GetDomain(), cfg.LastBackupTS, backupTS) + if err != nil { + return err + } + } + // The number of regions need to backup approximateRegions := 0 for _, r := range ranges { @@ -139,17 +154,25 @@ func RunBackup(c context.Context, cmdName string, cfg *BackupConfig) error { return err } - valid, err := client.FastChecksum() - if err != nil { - return err - } - if !valid { - log.Error("backup FastChecksum mismatch!") + if cfg.LastBackupTS == 0 { + var valid bool + valid, err = client.FastChecksum() + if err != nil { + return err + } + if !valid { + log.Error("backup FastChecksum mismatch!") + return errors.Errorf("mismatched checksum") + } + + } else { + // Since we don't support checksum for incremental data, fast checksum should be skipped. + log.Info("Skip fast checksum in incremental backup") } // Checksum has finished close(updateCh) - err = client.SaveBackupMeta(ctx) + err = client.SaveBackupMeta(ctx, ddlJobs) if err != nil { return err } diff --git a/pkg/task/restore.go b/pkg/task/restore.go index a56a1d6da..599dcb478 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -103,6 +103,14 @@ func RunRestore(c context.Context, cmdName string, cfg *RestoreConfig) error { return err } } + ddlJobs := restore.FilterDDLJobs(client.GetDDLJobs(), tables) + if err != nil { + return err + } + err = client.ExecDDLs(ddlJobs) + if err != nil { + return errors.Trace(err) + } rewriteRules, newTables, err := client.CreateTables(mgr.GetDomain(), tables, newTS) if err != nil { return err @@ -178,12 +186,12 @@ func filterRestoreFiles( for _, db := range client.GetDatabases() { createdDatabase := false for _, table := range db.Tables { - if !tableFilter.Match(&filter.Table{Schema: db.Schema.Name.O, Name: table.Schema.Name.O}) { + if !tableFilter.Match(&filter.Table{Schema: db.Info.Name.O, Name: table.Info.Name.O}) { continue } if !createdDatabase { - if err = client.CreateDatabase(db.Schema); err != nil { + if err = client.CreateDatabase(db.Info); err != nil { return nil, nil, err } createdDatabase = true diff --git a/pkg/utils/schema.go b/pkg/utils/schema.go index 67d28132f..0afe98e5b 100644 --- a/pkg/utils/schema.go +++ b/pkg/utils/schema.go @@ -24,7 +24,7 @@ const ( // Table wraps the schema and files of a table. type Table struct { Db *model.DBInfo - Schema *model.TableInfo + Info *model.TableInfo Crc64Xor uint64 TotalKvs uint64 TotalBytes uint64 @@ -33,14 +33,14 @@ type Table struct { // Database wraps the schema and tables of a database. type Database struct { - Schema *model.DBInfo + Info *model.DBInfo Tables []*Table } // GetTable returns a table of the database by name. func (db *Database) GetTable(name string) *Table { for _, table := range db.Tables { - if table.Schema.Name.String() == name { + if table.Info.Name.String() == name { return table } } @@ -61,7 +61,7 @@ func LoadBackupTables(meta *backup.BackupMeta) (map[string]*Database, error) { db, ok := databases[dbInfo.Name.String()] if !ok { db = &Database{ - Schema: dbInfo, + Info: dbInfo, Tables: make([]*Table, 0), } databases[dbInfo.Name.String()] = db @@ -94,7 +94,7 @@ func LoadBackupTables(meta *backup.BackupMeta) (map[string]*Database, error) { } table := &Table{ Db: dbInfo, - Schema: tableInfo, + Info: tableInfo, Crc64Xor: schema.Crc64Xor, TotalKvs: schema.TotalKvs, TotalBytes: schema.TotalBytes, diff --git a/tests/br_full_ddl/run.sh b/tests/br_full_ddl/run.sh index 1e40415d7..e50ef1ecf 100755 --- a/tests/br_full_ddl/run.sh +++ b/tests/br_full_ddl/run.sh @@ -36,7 +36,7 @@ done # backup full echo "backup start..." -br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 --log-file $LOG +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 --log-file $LOG checksum_count=$(cat $LOG | grep "fast checksum success" | wc -l | xargs) @@ -50,7 +50,7 @@ run_sql "DROP DATABASE $DB;" # restore full echo "restore start..." -br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR +run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR row_count_new=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') diff --git a/tests/br_incremental/run.sh b/tests/br_incremental/run.sh index bb6a42efb..b6a6061de 100755 --- a/tests/br_incremental/run.sh +++ b/tests/br_incremental/run.sh @@ -20,55 +20,38 @@ TABLE="usertable" run_sql "CREATE DATABASE $DB;" go-ycsb load mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB - -row_count_ori=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +row_count_ori_full=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') # full backup echo "full backup start..." -run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB" --db $DB -t $TABLE --ratelimit 5 --concurrency 4 - -run_sql "DROP TABLE $DB.$TABLE;" - -# full restore -echo "full restore start..." -run_br restore table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB" --pd $PD_ADDR - -row_count_new=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') - -if [ "$row_count_ori" -ne "$row_count_new" ];then - echo "TEST: [$TEST_NAME] full br failed!" - exit 1 -fi +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/full" --db $DB -t $TABLE --ratelimit 5 --concurrency 4 go-ycsb run mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB -row_count_ori=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') -last_backup_ts=$(br validate decode --field="end-version" -s "local://$TEST_DIR/$DB" | tail -n1) - -# clean up data -rm -rf $TEST_DIR/$DB - # incremental backup echo "incremental backup start..." -run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB" --db $DB -t $TABLE --ratelimit 5 --concurrency 4 --lastbackupts $last_backup_ts - -start_ts=$(br validate decode --field="start-version" -s "local://$TEST_DIR/$DB" | tail -n1) -end_ts=$(br validate decode --field="end-version" -s "local://$TEST_DIR/$DB" | tail -n1) +last_backup_ts=$(br validate decode --field="end-version" -s "local://$TEST_DIR/$DB/full" | tail -n1) +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/inc" --db $DB -t $TABLE --ratelimit 5 --concurrency 4 --lastbackupts $last_backup_ts +row_count_ori_inc=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') -echo "start version: $start_ts, end version: $end_ts" +run_sql "DROP DATABASE $DB;" +# full restore +echo "full restore start..." +run_br restore table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR +row_count_full=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +# check full restore +if [ "${row_count_full}" != "${row_count_ori_full}" ];then + echo "TEST: [$TEST_NAME] full restore fail on database $DB" + exit 1 +fi # incremental restore echo "incremental restore start..." -run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR - -row_count_new=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') - -echo "[original] row count: $row_count_ori, [after br] row count: $row_count_new" - -if [ "$row_count_ori" -eq "$row_count_new" ];then - echo "TEST: [$TEST_NAME] successed!" -else - echo "TEST: [$TEST_NAME] failed!" +run_br restore table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB/inc" --pd $PD_ADDR +row_count_inc=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +# check full restore +if [ "${row_count_inc}" != "${row_count_ori_inc}" ];then + echo "TEST: [$TEST_NAME] incremental restore fail on database $DB" exit 1 fi diff --git a/tests/br_incremental_ddl/run.sh b/tests/br_incremental_ddl/run.sh new file mode 100755 index 000000000..d9a88709b --- /dev/null +++ b/tests/br_incremental_ddl/run.sh @@ -0,0 +1,74 @@ +#!/bin/sh +# +# Copyright 2019 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +DB="$TEST_NAME" +TABLE="usertable" +ROW_COUNT=100 +PATH="tests/$TEST_NAME:bin:$PATH" + +echo "load data..." +# create database +run_sql "CREATE DATABASE IF NOT EXISTS $DB;" +# create table +run_sql "CREATE TABLE IF NOT EXISTS ${DB}.${TABLE} (c1 INT);" +# insert records +for i in $(seq $ROW_COUNT); do + run_sql "INSERT INTO ${DB}.${TABLE}(c1) VALUES ($i);" +done + +# full backup +echo "full backup start..." +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/full" --db $DB -t $TABLE --ratelimit 5 --concurrency 4 +# run ddls +echo "run ddls..." +run_sql "RENAME TABLE ${DB}.${TABLE} to ${DB}.${TABLE}1;" +run_sql "DROP TABLE ${DB}.${TABLE}1;" +run_sql "DROP DATABASE ${DB};" +run_sql "CREATE DATABASE ${DB};" +run_sql "CREATE TABLE ${DB}.${TABLE}1 (c2 CHAR(255));" +run_sql "RENAME TABLE ${DB}.${TABLE}1 to ${DB}.${TABLE};" +run_sql "TRUNCATE TABLE ${DB}.${TABLE};" +# insert records +for i in $(seq $ROW_COUNT); do + run_sql "INSERT INTO ${DB}.${TABLE}(c2) VALUES ('$i');" +done +# incremental backup +echo "incremental backup start..." +last_backup_ts=$(br validate decode --field="end-version" -s "local://$TEST_DIR/$DB/full" | tail -n1) +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/inc" --db $DB -t $TABLE --ratelimit 5 --concurrency 4 --lastbackupts $last_backup_ts + +run_sql "DROP DATABASE $DB;" +# full restore +echo "full restore start..." +run_br restore table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR +row_count_full=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +# check full restore +if [ "${row_count_full}" != "${ROW_COUNT}" ];then + echo "TEST: [$TEST_NAME] full restore fail on database $DB" + exit 1 +fi +# incremental restore +echo "incremental restore start..." +run_br restore table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB/inc" --pd $PD_ADDR +row_count_inc=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +# check full restore +if [ "${row_count_inc}" != "${ROW_COUNT}" ];then + echo "TEST: [$TEST_NAME] incremental restore fail on database $DB" + exit 1 +fi +run_sql "INSERT INTO ${DB}.${TABLE}(c2) VALUES ('1');" + +run_sql "DROP DATABASE $DB;" diff --git a/tests/br_incremental_index/run.sh b/tests/br_incremental_index/run.sh new file mode 100755 index 000000000..f4b4b9de7 --- /dev/null +++ b/tests/br_incremental_index/run.sh @@ -0,0 +1,74 @@ +#!/bin/sh +# +# Copyright 2019 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +DB="$TEST_NAME" +TABLE="usertable" +ROW_COUNT=100 +PATH="tests/$TEST_NAME:bin:$PATH" + +echo "load data..." +# create database +run_sql "CREATE DATABASE IF NOT EXISTS $DB;" +# create table +run_sql "CREATE TABLE IF NOT EXISTS ${DB}.${TABLE} (c1 INT);" +# insert records +for i in $(seq $ROW_COUNT); do + run_sql "INSERT INTO ${DB}.${TABLE} VALUES ($i);" +done + +# full backup +echo "backup full start..." +run_sql "CREATE INDEX idx_c1 ON ${DB}.${TABLE}(c1)" & +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/full" --ratelimit 5 --concurrency 4 +wait +# run ddls +echo "run ddls..." +run_sql "ALTER TABLE ${DB}.${TABLE} ADD COLUMN c2 INT NOT NULL;"; +run_sql "ALTER TABLE ${DB}.${TABLE} ADD COLUMN c3 INT NOT NULL;"; +run_sql "ALTER TABLE ${DB}.${TABLE} DROP COLUMN c3;"; +# incremental backup +echo "incremental backup start..." +last_backup_ts=$(br validate decode --field="end-version" -s "local://$TEST_DIR/$DB/full" | tail -n1) +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/inc" --db $DB -t $TABLE --ratelimit 5 --concurrency 4 --lastbackupts $last_backup_ts + +run_sql "DROP DATABASE $DB;" +# full restore +echo "full restore start..." +run_br restore table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR +row_count_full=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +# check full restore +if [ "${row_count_full}" != "${ROW_COUNT}" ];then + echo "TEST: [$TEST_NAME] full restore fail on database $DB" + exit 1 +fi +# incremental restore +echo "incremental restore start..." +run_br restore table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB/inc" --pd $PD_ADDR +row_count_inc=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +# check full restore +if [ "${row_count_inc}" != "${ROW_COUNT}" ];then + echo "TEST: [$TEST_NAME] incremental restore fail on database $DB" + exit 1 +fi +run_sql "INSERT INTO ${DB}.${TABLE} VALUES (1, 1);" +row_count_insert=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +# check insert count +if [ "${row_count_insert}" != "$(expr $row_count_inc + 1)" ];then + echo "TEST: [$TEST_NAME] insert record fail on database $DB" + exit 1 +fi + +run_sql "DROP DATABASE $DB;" diff --git a/tests/config/tikv.toml b/tests/config/tikv.toml index e93a16597..73323d878 100644 --- a/tests/config/tikv.toml +++ b/tests/config/tikv.toml @@ -11,3 +11,4 @@ max-open-files = 4096 [raftstore] # true (default value) for high reliability, this can prevent data loss when power failure. sync-log = false +capacity = "10GB" \ No newline at end of file diff --git a/tests/run.sh b/tests/run.sh index 3cedc7093..a4edb762a 100755 --- a/tests/run.sh +++ b/tests/run.sh @@ -24,6 +24,7 @@ TIDB_ADDR="127.0.0.1:4000" TIDB_STATUS_ADDR="127.0.0.1:10080" # actaul tikv_addr are TIKV_ADDR${i} TIKV_ADDR="127.0.0.1:2016" +TIKV_STATUS_ADDR="127.0.0.1:2018" TIKV_COUNT=4 stop_services() { @@ -55,6 +56,7 @@ start_services() { bin/tikv-server \ --pd "$PD_ADDR" \ -A "$TIKV_ADDR$i" \ + --status-addr "$TIKV_STATUS_ADDR$i" \ --log-file "$TEST_DIR/tikv${i}.log" \ -C "tests/config/tikv.toml" \ -s "$TEST_DIR/tikv${i}" &