Skip to content

Commit

Permalink
*: abstract the progress channel (updateCh) into the glue package (pi…
Browse files Browse the repository at this point in the history
…ngcap#196)

* *: abstract the progress channel (updateCh) into the glue package

* restore: fix crash in truncateTS() when the bound is unlimited

* task: fix comment

Co-authored-by: Ian <ArGregoryIan@gmail.com>
  • Loading branch information
kennytm and IANTHEREAL authored Mar 20, 2020
1 parent 4ea6c1c commit 2669204
Show file tree
Hide file tree
Showing 13 changed files with 101 additions and 36 deletions.
9 changes: 5 additions & 4 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.uber.org/zap"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/summary"
Expand Down Expand Up @@ -309,7 +310,7 @@ func (bc *Client) BackupRanges(
ctx context.Context,
ranges []rtree.Range,
req kvproto.BackupRequest,
updateCh chan<- struct{},
updateCh glue.Progress,
) error {
start := time.Now()
defer func() {
Expand Down Expand Up @@ -374,7 +375,7 @@ func (bc *Client) BackupRange(
ctx context.Context,
startKey, endKey []byte,
req kvproto.BackupRequest,
updateCh chan<- struct{},
updateCh glue.Progress,
) (err error) {
start := time.Now()
defer func() {
Expand Down Expand Up @@ -486,7 +487,7 @@ func (bc *Client) fineGrainedBackup(
rateLimit uint64,
concurrency uint32,
rangeTree rtree.RangeTree,
updateCh chan<- struct{},
updateCh glue.Progress,
) error {
bo := tikv.NewBackoffer(ctx, backupFineGrainedMaxBackoff)
for {
Expand Down Expand Up @@ -561,7 +562,7 @@ func (bc *Client) fineGrainedBackup(
rangeTree.Put(resp.StartKey, resp.EndKey, resp.Files)

// Update progress
updateCh <- struct{}{}
updateCh.Inc()
}
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pingcap/log"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/rtree"
)

Expand All @@ -38,7 +39,7 @@ func newPushDown(ctx context.Context, mgr ClientMgr, cap int) *pushDown {
func (push *pushDown) pushBackup(
req backup.BackupRequest,
stores []*metapb.Store,
updateCh chan<- struct{},
updateCh glue.Progress,
) (rtree.RangeTree, error) {
// Push down backup tasks to all tikv instances.
res := rtree.NewRangeTree()
Expand Down Expand Up @@ -90,7 +91,7 @@ func (push *pushDown) pushBackup(
resp.GetStartKey(), resp.GetEndKey(), resp.GetFiles())

// Update progress
updateCh <- struct{}{}
updateCh.Inc()
} else {
errPb := resp.GetError()
switch v := errPb.Detail.(type) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.uber.org/zap"

"github.com/pingcap/br/pkg/checksum"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/utils"
)
Expand Down Expand Up @@ -67,7 +68,7 @@ func (pending *Schemas) Start(
store kv.Storage,
backupTS uint64,
concurrency uint,
updateCh chan<- struct{},
updateCh glue.Progress,
) {
workerPool := utils.NewWorkerPool(concurrency, "Schemas")
go func() {
Expand All @@ -82,7 +83,7 @@ func (pending *Schemas) Start(

if pending.skipChecksum {
pending.backupSchemaCh <- schema
updateCh <- struct{}{}
updateCh.Inc()
return
}

Expand Down Expand Up @@ -110,7 +111,7 @@ func (pending *Schemas) Start(
zap.Duration("take", time.Since(start)))
pending.backupSchemaCh <- schema

updateCh <- struct{}{}
updateCh.Inc()
})
}
pending.wg.Wait()
Expand Down
27 changes: 23 additions & 4 deletions pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package backup
import (
"context"
"math"
"sync/atomic"

. "github.com/pingcap/check"
"github.com/pingcap/tidb-tools/pkg/filter"
Expand All @@ -30,6 +31,24 @@ func (s *testBackupSchemaSuite) TearDownSuite(c *C) {
testleak.AfterTest(c)()
}

type simpleProgress struct {
counter int64
}

func (sp *simpleProgress) Inc() {
atomic.AddInt64(&sp.counter, 1)
}

func (sp *simpleProgress) Close() {}

func (sp *simpleProgress) reset() {
atomic.StoreInt64(&sp.counter, 0)
}

func (sp *simpleProgress) get() int64 {
return atomic.LoadInt64(&sp.counter)
}

func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) {
c.Assert(s.mock.Start(), IsNil)
defer s.mock.Stop()
Expand Down Expand Up @@ -73,10 +92,10 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) {
s.mock.Domain, s.mock.Storage, testFilter, math.MaxUint64)
c.Assert(err, IsNil)
c.Assert(backupSchemas.Len(), Equals, 1)
updateCh := make(chan struct{}, 2)
updateCh := new(simpleProgress)
backupSchemas.Start(context.Background(), s.mock.Storage, math.MaxUint64, 1, updateCh)
schemas, err := backupSchemas.finishTableChecksum()
<-updateCh
c.Assert(updateCh.get(), Equals, int64(1))
c.Assert(err, IsNil)
c.Assert(len(schemas), Equals, 1)
// Cluster returns a dummy checksum (all fields are 1).
Expand All @@ -93,10 +112,10 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) {
s.mock.Domain, s.mock.Storage, noFilter, math.MaxUint64)
c.Assert(err, IsNil)
c.Assert(backupSchemas.Len(), Equals, 2)
updateCh.reset()
backupSchemas.Start(context.Background(), s.mock.Storage, math.MaxUint64, 2, updateCh)
schemas, err = backupSchemas.finishTableChecksum()
<-updateCh
<-updateCh
c.Assert(updateCh.get(), Equals, int64(2))
c.Assert(err, IsNil)
c.Assert(len(schemas), Equals, 2)
// Cluster returns a dummy checksum (all fields are 1).
Expand Down
12 changes: 12 additions & 0 deletions pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Glue interface {
// OwnsStorage returns whether the storage returned by Open() is owned
// If this method returns false, the connection manager will never close the storage.
OwnsStorage() bool

StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) Progress
}

// Session is an abstraction of the session.Session interface.
Expand All @@ -30,3 +32,13 @@ type Session interface {
ShowCreateTable(table *model.TableInfo, allocator autoid.Allocator) (string, error)
Close()
}

// Progress is an interface recording the current execution progress.
type Progress interface {
// Inc increases the progress. This method must be goroutine-safe, and can
// be called from any goroutine.
Inc()
// Close marks the progress as 100% complete and that Inc() can no longer be
// called.
Close()
}
5 changes: 5 additions & 0 deletions pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func (Glue) OwnsStorage() bool {
return true
}

// StartProgress implements glue.Glue
func (g Glue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress {
return g.tikvGlue.StartProgress(ctx, cmdName, total, redirectLog)
}

// Execute implements glue.Session
func (gs *tidbSession) Execute(ctx context.Context, sql string) error {
_, err := gs.se.Execute(ctx, sql)
Expand Down
22 changes: 22 additions & 0 deletions pkg/gluetikv/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
package gluetikv

import (
"context"

pd "github.com/pingcap/pd/v4/client"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"

"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/utils"
)

// Glue is an implementation of glue.Glue that accesses only TiKV without TiDB.
Expand Down Expand Up @@ -41,3 +44,22 @@ func (Glue) Open(path string, option pd.SecurityOption) (kv.Storage, error) {
func (Glue) OwnsStorage() bool {
return true
}

// StartProgress implements glue.Glue
func (Glue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress {
return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog)}
}

type progress struct {
ch chan<- struct{}
}

// Inc implements glue.Progress
func (p progress) Inc() {
p.ch <- struct{}{}
}

// Close implements glue.Progress
func (p progress) Close() {
close(p.ch)
}
12 changes: 6 additions & 6 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (rc *Client) setSpeedLimit() error {
func (rc *Client) RestoreFiles(
files []*backup.File,
rewriteRules *RewriteRules,
updateCh chan<- struct{},
updateCh glue.Progress,
) (err error) {
start := time.Now()
defer func() {
Expand Down Expand Up @@ -478,7 +478,7 @@ func (rc *Client) RestoreFiles(
case <-rc.ctx.Done():
errCh <- nil
case errCh <- rc.fileImporter.Import(fileReplica, rewriteRules):
updateCh <- struct{}{}
updateCh.Inc()
}
})
}
Expand All @@ -499,7 +499,7 @@ func (rc *Client) RestoreFiles(
}

// RestoreRaw tries to restore raw keys in the specified range.
func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.File, updateCh chan<- struct{}) error {
func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.File, updateCh glue.Progress) error {
start := time.Now()
defer func() {
elapsed := time.Since(start)
Expand Down Expand Up @@ -529,7 +529,7 @@ func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.Fil
case <-rc.ctx.Done():
errCh <- nil
case errCh <- rc.fileImporter.Import(fileReplica, emptyRules):
updateCh <- struct{}{}
updateCh.Inc()
}
})
}
Expand Down Expand Up @@ -617,7 +617,7 @@ func (rc *Client) ValidateChecksum(
kvClient kv.Client,
tables []*utils.Table,
newTables []*model.TableInfo,
updateCh chan<- struct{},
updateCh glue.Progress,
) error {
start := time.Now()
defer func() {
Expand Down Expand Up @@ -674,7 +674,7 @@ func (rc *Client) ValidateChecksum(
return
}

updateCh <- struct{}{}
updateCh.Inc()
})
}
wg.Wait()
Expand Down
8 changes: 6 additions & 2 deletions pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.uber.org/zap"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/summary"
)
Expand Down Expand Up @@ -309,6 +310,9 @@ func matchNewPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.Rewrit
}

func truncateTS(key []byte) []byte {
if len(key) == 0 {
return nil
}
return key[:len(key)-8]
}

Expand All @@ -320,7 +324,7 @@ func SplitRanges(
client *Client,
ranges []rtree.Range,
rewriteRules *RewriteRules,
updateCh chan<- struct{},
updateCh glue.Progress,
) error {
start := time.Now()
defer func() {
Expand All @@ -339,7 +343,7 @@ func SplitRanges(

return splitter.Split(ctx, ranges, rewriteRules, storeMap, func(keys [][]byte) {
for range keys {
updateCh <- struct{}{}
updateCh.Inc()
}
})
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/utils"
)

const (
Expand Down Expand Up @@ -160,7 +159,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig

// Backup
// Redirect to log if there is no log file to avoid unreadable output.
updateCh := utils.StartProgress(
updateCh := g.StartProgress(
ctx, cmdName, int64(approximateRegions), !cfg.LogProgress)

req := kvproto.BackupRequest{
Expand All @@ -175,14 +174,14 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
return err
}
// Backup has finished
close(updateCh)
updateCh.Close()

// Checksum
backupSchemasConcurrency := backup.DefaultSchemaConcurrency
if backupSchemas.Len() < backupSchemasConcurrency {
backupSchemasConcurrency = backupSchemas.Len()
}
updateCh = utils.StartProgress(
updateCh = g.StartProgress(
ctx, "Checksum", int64(backupSchemas.Len()), !cfg.LogProgress)
backupSchemas.SetSkipChecksum(!cfg.Checksum)
backupSchemas.Start(
Expand All @@ -209,7 +208,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
log.Info("Skip fast checksum in incremental backup")
}
// Checksum has finished
close(updateCh)
updateCh.Close()

err = client.SaveBackupMeta(ctx, ddlJobs)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf

// Backup
// Redirect to log if there is no log file to avoid unreadable output.
updateCh := utils.StartProgress(
updateCh := g.StartProgress(
ctx, cmdName, int64(approximateRegions), !cfg.LogProgress)

req := kvproto.BackupRequest{
Expand All @@ -134,7 +134,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
return err
}
// Backup has finished
close(updateCh)
updateCh.Close()

// Checksum
err = client.SaveBackupMeta(ctx, nil)
Expand Down
Loading

0 comments on commit 2669204

Please sign in to comment.