Skip to content

Commit

Permalink
Log Backup: decouple log backup resolve locks from GCWorker. (pingcap…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot committed Sep 12, 2023
1 parent be57cf0 commit ec0ffe8
Show file tree
Hide file tree
Showing 15 changed files with 658 additions and 425 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3703,8 +3703,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:2W6QJ+Dh8SreeR4Z3xHXZLwdhIf1FHQwoMNBBBTn5Jg=",
version = "v2.0.4-0.20230905091602-cf19ede9ecd6",
sum = "h1:pvHtHnUDfqMAU3/F8JVpuuG86E/lemJWZq0iSCV3kKY=",
version = "v2.0.4-0.20230912041415-9c163cc8574b",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ func (mgr *Mgr) GetTLSConfig() *tls.Config {
return mgr.StoreManager.TLSConfig()
}

// GetStore gets the tikvStore.
func (mgr *Mgr) GetStore() tikv.Storage {
return mgr.tikvStore
}

// GetLockResolver gets the LockResolver.
func (mgr *Mgr) GetLockResolver() *txnlock.LockResolver {
return mgr.tikvStore.GetLockResolver()
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ go_library(
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//txnkv/rangetask",
"@com_github_tikv_client_go_v2//txnkv/txnlock",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
Expand Down Expand Up @@ -82,18 +85,24 @@ go_test(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/logbackuppb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//txnkv/txnlock",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_server_v3//embed",
"@io_etcd_go_etcd_server_v3//mvcc",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//status",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
Expand Down
218 changes: 186 additions & 32 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
package streamhelper

import (
"bytes"
"context"
"math"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand All @@ -17,7 +21,10 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/rangetask"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -60,7 +67,9 @@ type CheckpointAdvancer struct {

// the cached last checkpoint.
// if no progress, this cache can help us don't to send useless requests.
lastCheckpoint uint64
lastCheckpoint *checkpoint
lastCheckpointMu sync.Mutex
inResolvingLock atomic.Bool

checkpoints *spans.ValueSortedFull
checkpointsMu sync.Mutex
Expand All @@ -69,6 +78,53 @@ type CheckpointAdvancer struct {
subscriberMu sync.Mutex
}

// checkpoint represents the TS with specific range.
// it's only used in advancer.go.
type checkpoint struct {
StartKey []byte
EndKey []byte
TS uint64

// It's better to use PD timestamp in future, for now
// use local time to decide the time to resolve lock is ok.
resolveLockTime time.Time
}

func newCheckpointWithTS(ts uint64) *checkpoint {
return &checkpoint{
TS: ts,
resolveLockTime: time.Now(),
}
}

func NewCheckpointWithSpan(s spans.Valued) *checkpoint {
return &checkpoint{
StartKey: s.Key.StartKey,
EndKey: s.Key.EndKey,
TS: s.Value,
resolveLockTime: time.Now(),
}
}

func (c *checkpoint) safeTS() uint64 {
return c.TS - 1
}

func (c *checkpoint) equal(o *checkpoint) bool {
return bytes.Equal(c.StartKey, o.StartKey) &&
bytes.Equal(c.EndKey, o.EndKey) && c.TS == o.TS
}

// if a checkpoint stay in a time too long(3 min)
// we should try to resolve lock for the range
// to keep the RPO in 5 min.
func (c *checkpoint) needResolveLocks() bool {
failpoint.Inject("NeedResolveLocks", func(val failpoint.Value) {
failpoint.Return(val.(bool))
})
return time.Since(c.resolveLockTime) > 3*time.Minute
}

// NewCheckpointAdvancer creates a checkpoint advancer with the env.
func NewCheckpointAdvancer(env Env) *CheckpointAdvancer {
return &CheckpointAdvancer{
Expand All @@ -92,11 +148,23 @@ func (c *CheckpointAdvancer) UpdateConfigWith(f func(*config.Config)) {
c.UpdateConfig(cfg)
}

// UpdateLastCheckpoint modify the checkpoint in ticking.
func (c *CheckpointAdvancer) UpdateLastCheckpoint(p *checkpoint) {
c.lastCheckpointMu.Lock()
c.lastCheckpoint = p
c.lastCheckpointMu.Unlock()
}

// Config returns the current config.
func (c *CheckpointAdvancer) Config() config.Config {
return c.cfg
}

// GetInResolvingLock only used for test.
func (c *CheckpointAdvancer) GetInResolvingLock() bool {
return c.inResolvingLock.Load()
}

// GetCheckpointInRange scans the regions in the range,
// collect them to the collector.
func (c *CheckpointAdvancer) GetCheckpointInRange(ctx context.Context, start, end []byte, collector *clusterCollector) error {
Expand Down Expand Up @@ -170,16 +238,35 @@ func tsoBefore(n time.Duration) uint64 {
return oracle.ComposeTS(now.UnixMilli()-n.Milliseconds(), 0)
}

func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context, threshold time.Duration) (uint64, error) {
func tsoAfter(ts uint64, n time.Duration) uint64 {
return oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(n))
}

func (c *CheckpointAdvancer) WithCheckpoints(f func(*spans.ValueSortedFull)) {
c.checkpointsMu.Lock()
defer c.checkpointsMu.Unlock()

f(c.checkpoints)
}

// only used for test
func (c *CheckpointAdvancer) NewCheckpoints(cps *spans.ValueSortedFull) {
c.checkpoints = cps
}

func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context,
threshold time.Duration) (spans.Valued, error) {
var targets []spans.Valued
c.checkpoints.TraverseValuesLessThan(tsoBefore(threshold), func(v spans.Valued) bool {
targets = append(targets, v)
return true
var minValue spans.Valued
c.WithCheckpoints(func(vsf *spans.ValueSortedFull) {
c.checkpoints.TraverseValuesLessThan(tsoBefore(threshold), func(v spans.Valued) bool {
targets = append(targets, v)
return true
})
minValue = vsf.Min()
})
if len(targets) == 0 {
c.checkpointsMu.Lock()
defer c.checkpointsMu.Unlock()
return c.checkpoints.MinValue(), nil
return minValue, nil
}
samples := targets
if len(targets) > 3 {
Expand All @@ -191,12 +278,9 @@ func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context,

err := c.tryAdvance(ctx, len(targets), func(i int) kv.KeyRange { return targets[i].Key })
if err != nil {
return 0, err
return minValue, err
}
c.checkpointsMu.Lock()
ts := c.checkpoints.MinValue()
c.checkpointsMu.Unlock()
return ts, nil
return minValue, nil
}

func (c *CheckpointAdvancer) consumeAllTask(ctx context.Context, ch <-chan TaskEvent) error {
Expand Down Expand Up @@ -288,8 +372,9 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
c.task = e.Info
c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] })
c.checkpoints = spans.Sorted(spans.NewFullWith(e.Ranges, 0))
c.lastCheckpoint = e.Info.StartTs
log.Info("added event", zap.Stringer("task", e.Info), zap.Stringer("ranges", logutil.StringifyKeys(c.taskRange)))
c.lastCheckpoint = newCheckpointWithTS(e.Info.StartTs)
log.Info("added event", zap.Stringer("task", e.Info),
zap.Stringer("ranges", logutil.StringifyKeys(c.taskRange)))
case EventDel:
utils.LogBackupTaskCountDec()
c.task = nil
Expand All @@ -310,33 +395,39 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
return nil
}

func (c *CheckpointAdvancer) setCheckpoint(cp uint64) bool {
if cp < c.lastCheckpoint {
log.Warn("failed to update global checkpoint: stale", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
func (c *CheckpointAdvancer) setCheckpoint(ctx context.Context, s spans.Valued) bool {
cp := NewCheckpointWithSpan(s)
if cp.TS < c.lastCheckpoint.TS {
log.Warn("failed to update global checkpoint: stale",
zap.Uint64("old", c.lastCheckpoint.TS), zap.Uint64("new", cp.TS))
return false
}
if cp <= c.lastCheckpoint {
// Need resolve lock for different range and same TS
// so check the range and TS here.
if cp.equal(c.lastCheckpoint) {
return false
}
c.lastCheckpoint = cp
c.UpdateLastCheckpoint(cp)
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint.TS))
return true
}

// advanceCheckpointBy advances the checkpoint by a checkpoint getter function.
func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context, getCheckpoint func(context.Context) (uint64, error)) error {
func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context,
getCheckpoint func(context.Context) (spans.Valued, error)) error {
start := time.Now()
cp, err := getCheckpoint(ctx)
if err != nil {
return err
}

if c.setCheckpoint(cp) {
if c.setCheckpoint(ctx, cp) {
log.Info("uploading checkpoint for task",
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp)),
zap.Uint64("checkpoint", cp),
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp.Value)),
zap.Uint64("checkpoint", cp.Value),
zap.String("task", c.task.Name),
zap.Stringer("take", time.Since(start)))
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint))
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint.TS))
}
return nil
}
Expand Down Expand Up @@ -387,28 +478,46 @@ func (c *CheckpointAdvancer) subscribeTick(ctx context.Context) error {

func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
c.checkpointsMu.Lock()
c.setCheckpoint(c.checkpoints.MinValue())
c.setCheckpoint(ctx, c.checkpoints.Min())
c.checkpointsMu.Unlock()
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint); err != nil {
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint.TS); err != nil {
return errors.Annotate(err, "failed to upload global checkpoint")
}
return nil
}

func (c *CheckpointAdvancer) optionalTick(cx context.Context) error {
// lastCheckpoint is not increased too long enough.
// assume the cluster has expired locks for whatever reasons.
var targets []spans.Valued
if c.lastCheckpoint != nil && c.lastCheckpoint.needResolveLocks() && c.inResolvingLock.CompareAndSwap(false, true) {
c.WithCheckpoints(func(vsf *spans.ValueSortedFull) {
// when get locks here. assume these locks are not belong to same txn,
// but these locks' start ts are close to 1 minute. try resolve these locks at one time
vsf.TraverseValuesLessThan(tsoAfter(c.lastCheckpoint.TS, time.Minute), func(v spans.Valued) bool {
targets = append(targets, v)
return true
})
})
if len(targets) != 0 {
log.Info("Advancer starts to resolve locks", zap.Int("targets", len(targets)))
// use new context here to avoid timeout
ctx := context.Background()
c.asyncResolveLocksForRanges(ctx, targets)
} else {
// don't forget set state back
c.inResolvingLock.Store(false)
}
}
threshold := c.Config().GetDefaultStartPollThreshold()
if err := c.subscribeTick(cx); err != nil {
log.Warn("[log backup advancer] Subscriber meet error, would polling the checkpoint.", logutil.ShortError(err))
threshold = c.Config().GetSubscriberErrorStartPollThreshold()
}

err := c.advanceCheckpointBy(cx, func(cx context.Context) (uint64, error) {
return c.advanceCheckpointBy(cx, func(cx context.Context) (spans.Valued, error) {
return c.CalculateGlobalCheckpointLight(cx, threshold)
})
if err != nil {
return err
}
return nil
}

func (c *CheckpointAdvancer) tick(ctx context.Context) error {
Expand Down Expand Up @@ -437,3 +546,48 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {

return errs
}

func (c *CheckpointAdvancer) asyncResolveLocksForRanges(ctx context.Context, targets []spans.Valued) {
// run in another goroutine
// do not block main tick here
go func() {
failpoint.Inject("AsyncResolveLocks", func() {})
handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
// we will scan all locks and try to resolve them by check txn status.
return tikv.ResolveLocksForRange(
ctx, c.env, math.MaxUint64, r.StartKey, r.EndKey, tikv.NewGcResolveLockMaxBackoffer, tikv.GCScanLockLimit)
}
workerPool := utils.NewWorkerPool(uint(config.DefaultMaxConcurrencyAdvance), "advancer resolve locks")
var wg sync.WaitGroup
for _, r := range targets {
targetRange := r
wg.Add(1)
workerPool.Apply(func() {
defer wg.Done()
// Run resolve lock on the whole TiKV cluster.
// it will use startKey/endKey to scan region in PD.
// but regionCache already has a codecPDClient. so just use decode key here.
// and it almost only include one region here. so set concurrency to 1.
runner := rangetask.NewRangeTaskRunner("advancer-resolve-locks-runner",
c.env.GetStore(), 1, handler)
err := runner.RunOnRange(ctx, targetRange.Key.StartKey, targetRange.Key.EndKey)
if err != nil {
// wait for next tick
log.Warn("resolve locks failed, wait for next tick", zap.String("category", "advancer"),
zap.String("uuid", "log backup advancer"),
zap.Error(err))
}
})
}
wg.Wait()
log.Info("finish resolve locks for checkpoint", zap.String("category", "advancer"),
zap.String("uuid", "log backup advancer"),
logutil.Key("StartKey", c.lastCheckpoint.StartKey),
logutil.Key("EndKey", c.lastCheckpoint.EndKey),
zap.Int("targets", len(targets)))
c.lastCheckpointMu.Lock()
c.lastCheckpoint.resolveLockTime = time.Now()
c.lastCheckpointMu.Unlock()
c.inResolvingLock.Store(false)
}()
}
Loading

0 comments on commit ec0ffe8

Please sign in to comment.