diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 9693b6b5f..219f58550 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -32,6 +32,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/br/pkg/conn" + "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" @@ -309,7 +310,7 @@ func (bc *Client) BackupRanges( ctx context.Context, ranges []rtree.Range, req kvproto.BackupRequest, - updateCh chan<- struct{}, + updateCh glue.Progress, ) error { start := time.Now() defer func() { @@ -374,7 +375,7 @@ func (bc *Client) BackupRange( ctx context.Context, startKey, endKey []byte, req kvproto.BackupRequest, - updateCh chan<- struct{}, + updateCh glue.Progress, ) (err error) { start := time.Now() defer func() { @@ -486,7 +487,7 @@ func (bc *Client) fineGrainedBackup( rateLimit uint64, concurrency uint32, rangeTree rtree.RangeTree, - updateCh chan<- struct{}, + updateCh glue.Progress, ) error { bo := tikv.NewBackoffer(ctx, backupFineGrainedMaxBackoff) for { @@ -561,7 +562,7 @@ func (bc *Client) fineGrainedBackup( rangeTree.Put(resp.StartKey, resp.EndKey, resp.Files) // Update progress - updateCh <- struct{}{} + updateCh.Inc() } } diff --git a/pkg/backup/push.go b/pkg/backup/push.go index 4aaffa7e2..d329f7088 100644 --- a/pkg/backup/push.go +++ b/pkg/backup/push.go @@ -12,6 +12,7 @@ import ( "github.com/pingcap/log" "go.uber.org/zap" + "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/rtree" ) @@ -38,7 +39,7 @@ func newPushDown(ctx context.Context, mgr ClientMgr, cap int) *pushDown { func (push *pushDown) pushBackup( req backup.BackupRequest, stores []*metapb.Store, - updateCh chan<- struct{}, + updateCh glue.Progress, ) (rtree.RangeTree, error) { // Push down backup tasks to all tikv instances. res := rtree.NewRangeTree() @@ -90,7 +91,7 @@ func (push *pushDown) pushBackup( resp.GetStartKey(), resp.GetEndKey(), resp.GetFiles()) // Update progress - updateCh <- struct{}{} + updateCh.Inc() } else { errPb := resp.GetError() switch v := errPb.Detail.(type) { diff --git a/pkg/backup/schema.go b/pkg/backup/schema.go index 18583d094..73a62477d 100644 --- a/pkg/backup/schema.go +++ b/pkg/backup/schema.go @@ -18,6 +18,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/br/pkg/checksum" + "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/summary" "github.com/pingcap/br/pkg/utils" ) @@ -67,7 +68,7 @@ func (pending *Schemas) Start( store kv.Storage, backupTS uint64, concurrency uint, - updateCh chan<- struct{}, + updateCh glue.Progress, ) { workerPool := utils.NewWorkerPool(concurrency, "Schemas") go func() { @@ -82,7 +83,7 @@ func (pending *Schemas) Start( if pending.skipChecksum { pending.backupSchemaCh <- schema - updateCh <- struct{}{} + updateCh.Inc() return } @@ -110,7 +111,7 @@ func (pending *Schemas) Start( zap.Duration("take", time.Since(start))) pending.backupSchemaCh <- schema - updateCh <- struct{}{} + updateCh.Inc() }) } pending.wg.Wait() diff --git a/pkg/backup/schema_test.go b/pkg/backup/schema_test.go index 3b3bef897..d3c82f172 100644 --- a/pkg/backup/schema_test.go +++ b/pkg/backup/schema_test.go @@ -5,6 +5,7 @@ package backup import ( "context" "math" + "sync/atomic" . "github.com/pingcap/check" "github.com/pingcap/tidb-tools/pkg/filter" @@ -30,6 +31,24 @@ func (s *testBackupSchemaSuite) TearDownSuite(c *C) { testleak.AfterTest(c)() } +type simpleProgress struct { + counter int64 +} + +func (sp *simpleProgress) Inc() { + atomic.AddInt64(&sp.counter, 1) +} + +func (sp *simpleProgress) Close() {} + +func (sp *simpleProgress) reset() { + atomic.StoreInt64(&sp.counter, 0) +} + +func (sp *simpleProgress) get() int64 { + return atomic.LoadInt64(&sp.counter) +} + func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) { c.Assert(s.mock.Start(), IsNil) defer s.mock.Stop() @@ -73,10 +92,10 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) { s.mock.Domain, s.mock.Storage, testFilter, math.MaxUint64) c.Assert(err, IsNil) c.Assert(backupSchemas.Len(), Equals, 1) - updateCh := make(chan struct{}, 2) + updateCh := new(simpleProgress) backupSchemas.Start(context.Background(), s.mock.Storage, math.MaxUint64, 1, updateCh) schemas, err := backupSchemas.finishTableChecksum() - <-updateCh + c.Assert(updateCh.get(), Equals, int64(1)) c.Assert(err, IsNil) c.Assert(len(schemas), Equals, 1) // Cluster returns a dummy checksum (all fields are 1). @@ -93,10 +112,10 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) { s.mock.Domain, s.mock.Storage, noFilter, math.MaxUint64) c.Assert(err, IsNil) c.Assert(backupSchemas.Len(), Equals, 2) + updateCh.reset() backupSchemas.Start(context.Background(), s.mock.Storage, math.MaxUint64, 2, updateCh) schemas, err = backupSchemas.finishTableChecksum() - <-updateCh - <-updateCh + c.Assert(updateCh.get(), Equals, int64(2)) c.Assert(err, IsNil) c.Assert(len(schemas), Equals, 2) // Cluster returns a dummy checksum (all fields are 1). diff --git a/pkg/glue/glue.go b/pkg/glue/glue.go index f2f3ff55e..88a05c5c3 100644 --- a/pkg/glue/glue.go +++ b/pkg/glue/glue.go @@ -21,6 +21,8 @@ type Glue interface { // OwnsStorage returns whether the storage returned by Open() is owned // If this method returns false, the connection manager will never close the storage. OwnsStorage() bool + + StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) Progress } // Session is an abstraction of the session.Session interface. @@ -30,3 +32,13 @@ type Session interface { ShowCreateTable(table *model.TableInfo, allocator autoid.Allocator) (string, error) Close() } + +// Progress is an interface recording the current execution progress. +type Progress interface { + // Inc increases the progress. This method must be goroutine-safe, and can + // be called from any goroutine. + Inc() + // Close marks the progress as 100% complete and that Inc() can no longer be + // called. + Close() +} diff --git a/pkg/gluetidb/glue.go b/pkg/gluetidb/glue.go index 80756d2c2..5f4aff6fa 100644 --- a/pkg/gluetidb/glue.go +++ b/pkg/gluetidb/glue.go @@ -51,6 +51,11 @@ func (Glue) OwnsStorage() bool { return true } +// StartProgress implements glue.Glue +func (g Glue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress { + return g.tikvGlue.StartProgress(ctx, cmdName, total, redirectLog) +} + // Execute implements glue.Session func (gs *tidbSession) Execute(ctx context.Context, sql string) error { _, err := gs.se.Execute(ctx, sql) diff --git a/pkg/gluetikv/glue.go b/pkg/gluetikv/glue.go index e63b35b95..ab3c0d57f 100644 --- a/pkg/gluetikv/glue.go +++ b/pkg/gluetikv/glue.go @@ -3,6 +3,8 @@ package gluetikv import ( + "context" + pd "github.com/pingcap/pd/v4/client" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" @@ -10,6 +12,7 @@ import ( "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/br/pkg/glue" + "github.com/pingcap/br/pkg/utils" ) // Glue is an implementation of glue.Glue that accesses only TiKV without TiDB. @@ -41,3 +44,22 @@ func (Glue) Open(path string, option pd.SecurityOption) (kv.Storage, error) { func (Glue) OwnsStorage() bool { return true } + +// StartProgress implements glue.Glue +func (Glue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress { + return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog)} +} + +type progress struct { + ch chan<- struct{} +} + +// Inc implements glue.Progress +func (p progress) Inc() { + p.ch <- struct{}{} +} + +// Close implements glue.Progress +func (p progress) Close() { + close(p.ch) +} diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 2453f2974..6b88f5a6d 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -445,7 +445,7 @@ func (rc *Client) setSpeedLimit() error { func (rc *Client) RestoreFiles( files []*backup.File, rewriteRules *RewriteRules, - updateCh chan<- struct{}, + updateCh glue.Progress, ) (err error) { start := time.Now() defer func() { @@ -478,7 +478,7 @@ func (rc *Client) RestoreFiles( case <-rc.ctx.Done(): errCh <- nil case errCh <- rc.fileImporter.Import(fileReplica, rewriteRules): - updateCh <- struct{}{} + updateCh.Inc() } }) } @@ -499,7 +499,7 @@ func (rc *Client) RestoreFiles( } // RestoreRaw tries to restore raw keys in the specified range. -func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.File, updateCh chan<- struct{}) error { +func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.File, updateCh glue.Progress) error { start := time.Now() defer func() { elapsed := time.Since(start) @@ -529,7 +529,7 @@ func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.Fil case <-rc.ctx.Done(): errCh <- nil case errCh <- rc.fileImporter.Import(fileReplica, emptyRules): - updateCh <- struct{}{} + updateCh.Inc() } }) } @@ -617,7 +617,7 @@ func (rc *Client) ValidateChecksum( kvClient kv.Client, tables []*utils.Table, newTables []*model.TableInfo, - updateCh chan<- struct{}, + updateCh glue.Progress, ) error { start := time.Now() defer func() { @@ -674,7 +674,7 @@ func (rc *Client) ValidateChecksum( return } - updateCh <- struct{}{} + updateCh.Inc() }) } wg.Wait() diff --git a/pkg/restore/util.go b/pkg/restore/util.go index c49c07994..698de6aec 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -22,6 +22,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/br/pkg/conn" + "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/summary" ) @@ -309,6 +310,9 @@ func matchNewPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.Rewrit } func truncateTS(key []byte) []byte { + if len(key) == 0 { + return nil + } return key[:len(key)-8] } @@ -320,7 +324,7 @@ func SplitRanges( client *Client, ranges []rtree.Range, rewriteRules *RewriteRules, - updateCh chan<- struct{}, + updateCh glue.Progress, ) error { start := time.Now() defer func() { @@ -339,7 +343,7 @@ func SplitRanges( return splitter.Split(ctx, ranges, rewriteRules, storeMap, func(keys [][]byte) { for range keys { - updateCh <- struct{}{} + updateCh.Inc() } }) } diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 5944a22a0..bf90c6739 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" - "github.com/pingcap/br/pkg/utils" ) const ( @@ -160,7 +159,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig // Backup // Redirect to log if there is no log file to avoid unreadable output. - updateCh := utils.StartProgress( + updateCh := g.StartProgress( ctx, cmdName, int64(approximateRegions), !cfg.LogProgress) req := kvproto.BackupRequest{ @@ -175,14 +174,14 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig return err } // Backup has finished - close(updateCh) + updateCh.Close() // Checksum backupSchemasConcurrency := backup.DefaultSchemaConcurrency if backupSchemas.Len() < backupSchemasConcurrency { backupSchemasConcurrency = backupSchemas.Len() } - updateCh = utils.StartProgress( + updateCh = g.StartProgress( ctx, "Checksum", int64(backupSchemas.Len()), !cfg.LogProgress) backupSchemas.SetSkipChecksum(!cfg.Checksum) backupSchemas.Start( @@ -209,7 +208,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig log.Info("Skip fast checksum in incremental backup") } // Checksum has finished - close(updateCh) + updateCh.Close() err = client.SaveBackupMeta(ctx, ddlJobs) if err != nil { diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index d9deaccba..e879523b5 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -117,7 +117,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf // Backup // Redirect to log if there is no log file to avoid unreadable output. - updateCh := utils.StartProgress( + updateCh := g.StartProgress( ctx, cmdName, int64(approximateRegions), !cfg.LogProgress) req := kvproto.BackupRequest{ @@ -134,7 +134,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf return err } // Backup has finished - close(updateCh) + updateCh.Close() // Checksum err = client.SaveBackupMeta(ctx, nil) diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 7d5dd6846..c75d53f89 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -171,7 +171,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf ranges = restore.AttachFilesToRanges(files, ranges) // Redirect to log if there is no log file to avoid unreadable output. - updateCh := utils.StartProgress( + updateCh := g.StartProgress( ctx, cmdName, // Split/Scatter + Download/Ingest @@ -239,17 +239,17 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } // Restore has finished. - close(updateCh) + updateCh.Close() // Checksum - updateCh = utils.StartProgress( + updateCh = g.StartProgress( ctx, "Checksum", int64(len(newTables)), !cfg.LogProgress) err = client.ValidateChecksum( ctx, mgr.GetTiKV().GetClient(), tables, newTables, updateCh) if err != nil { return err } - close(updateCh) + updateCh.Close() return nil } @@ -399,7 +399,7 @@ func RunRestoreTiflashReplica(c context.Context, g glue.Glue, cmdName string, cf for _, db := range dbs { tables = append(tables, db.Tables...) } - updateCh := utils.StartProgress( + updateCh := g.StartProgress( ctx, "RecoverTiflashReplica", int64(len(tables)), !cfg.LogProgress) for _, t := range tables { log.Info("get table", zap.Stringer("name", t.Info.Name), @@ -409,9 +409,10 @@ func RunRestoreTiflashReplica(c context.Context, g glue.Glue, cmdName string, cf if err != nil { return err } - updateCh <- struct{}{} + updateCh.Inc() } } + updateCh.Close() summary.CollectInt("recover tables", len(tables)) return nil diff --git a/pkg/task/restore_raw.go b/pkg/task/restore_raw.go index 308a44b4e..ccd6bd17a 100644 --- a/pkg/task/restore_raw.go +++ b/pkg/task/restore_raw.go @@ -97,7 +97,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR // Redirect to log if there is no log file to avoid unreadable output. // TODO: How to show progress? - updateCh := utils.StartProgress( + updateCh := g.StartProgress( ctx, "Raw Restore", // Split/Scatter + Download/Ingest @@ -124,7 +124,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR return errors.Trace(err) } // Restore has finished. - close(updateCh) + updateCh.Close() return nil }