Skip to content

Commit

Permalink
ddl: Remove expired keys on PD (#10406) (#11023)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored Jul 4, 2019
1 parent 29f559a commit 842acd8
Show file tree
Hide file tree
Showing 10 changed files with 463 additions and 41 deletions.
22 changes: 17 additions & 5 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ type DDL interface {
// RegisterEventCh registers event channel for ddl.
RegisterEventCh(chan<- *util.Event)
// SchemaSyncer gets the schema syncer.
SchemaSyncer() SchemaSyncer
SchemaSyncer() util.SchemaSyncer
// OwnerManager gets the owner manager.
OwnerManager() owner.Manager
// GetID gets the ddl ID.
Expand All @@ -261,7 +261,7 @@ type ddlCtx struct {
uuid string
store kv.Storage
ownerManager owner.Manager
schemaSyncer SchemaSyncer
schemaSyncer util.SchemaSyncer
ddlJobDoneCh chan struct{}
ddlEventCh chan<- *util.Event
lease time.Duration // lease is schema lease.
Expand Down Expand Up @@ -327,15 +327,15 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
id := uuid.NewV4().String()
ctx, cancelFunc := context.WithCancel(ctx)
var manager owner.Manager
var syncer SchemaSyncer
var syncer util.SchemaSyncer
if etcdCli == nil {
// The etcdCli is nil if the store is localstore which is only used for testing.
// So we use mockOwnerManager and MockSchemaSyncer.
manager = owner.NewMockManager(id, cancelFunc)
syncer = NewMockSchemaSyncer()
} else {
manager = owner.NewOwnerManager(etcdCli, ddlPrompt, id, DDLOwnerKey, cancelFunc)
syncer = NewSchemaSyncer(etcdCli, id)
syncer = util.NewSchemaSyncer(etcdCli, id, manager)
}

ddlCtx := &ddlCtx{
Expand Down Expand Up @@ -403,6 +403,17 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) {
// checks owner firstly and try to find whether a job exists and run.
asyncNotify(worker.ddlJobCh)
}

go tidbutil.WithRecovery(
func() { d.schemaSyncer.StartCleanWork() },
func(r interface{}) {
if r != nil {
logutil.Logger(ddlLogCtx).Error("[ddl] DDL syncer clean worker meet panic",
zap.String("ID", d.uuid), zap.Reflect("r", r), zap.Stack("stack trace"))
metrics.PanicCounter.WithLabelValues(metrics.LabelDDLSyncer).Inc()
}
})
metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s", metrics.StartCleanWork)).Inc()
}
}

Expand All @@ -414,6 +425,7 @@ func (d *ddl) close() {
startTime := time.Now()
close(d.quitCh)
d.ownerManager.Cancel()
d.schemaSyncer.CloseCleanWork()
err := d.schemaSyncer.RemoveSelfVersionPath()
if err != nil {
logutil.Logger(ddlLogCtx).Error("[ddl] remove self version path failed", zap.Error(err))
Expand Down Expand Up @@ -458,7 +470,7 @@ func (d *ddl) genGlobalID() (int64, error) {
}

// SchemaSyncer implements DDL.SchemaSyncer interface.
func (d *ddl) SchemaSyncer() SchemaSyncer {
func (d *ddl) SchemaSyncer() util.SchemaSyncer {
return d.schemaSyncer
}

Expand Down
4 changes: 4 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ var (
RunWorker = true
// ddlWorkerID is used for generating the next DDL worker ID.
ddlWorkerID = int32(0)
// WaitTimeWhenErrorOccured is waiting interval when processing DDL jobs encounter errors.
WaitTimeWhenErrorOccured = 1 * time.Second
)

type workerType byte
Expand Down Expand Up @@ -595,6 +597,8 @@ func (w *worker) waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time
if terror.ErrorEqual(err, context.DeadlineExceeded) {
return
}
d.schemaSyncer.NotifyCleanExpiredPaths()
// Wait until timeout.
select {
case <-ctx.Done():
return
Expand Down
14 changes: 12 additions & 2 deletions ddl/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/sessionctx"
"golang.org/x/net/context"
)

var _ SchemaSyncer = &MockSchemaSyncer{}
var _ util.SchemaSyncer = &MockSchemaSyncer{}

const mockCheckVersInterval = 2 * time.Millisecond

Expand All @@ -37,7 +38,7 @@ type MockSchemaSyncer struct {
}

// NewMockSchemaSyncer creates a new mock SchemaSyncer.
func NewMockSchemaSyncer() SchemaSyncer {
func NewMockSchemaSyncer() util.SchemaSyncer {
return &MockSchemaSyncer{}
}

Expand Down Expand Up @@ -113,6 +114,15 @@ func (s *MockSchemaSyncer) OwnerCheckAllVersions(ctx context.Context, latestVer
}
}

// NotifyCleanExpiredPaths implements SchemaSyncer.NotifyCleanExpiredPaths interface.
func (s *MockSchemaSyncer) NotifyCleanExpiredPaths() bool { return true }

// StartCleanWork implements SchemaSyncer.StartCleanWork interface.
func (s *MockSchemaSyncer) StartCleanWork() {}

// CloseCleanWork implements SchemaSyncer.CloseCleanWork interface.
func (s *MockSchemaSyncer) CloseCleanWork() {}

type mockDelRange struct {
}

Expand Down
138 changes: 132 additions & 6 deletions ddl/syncer.go → ddl/util/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl
package util

import (
"fmt"
Expand All @@ -24,7 +24,9 @@ import (

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/pingcap/errors"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/util/logutil"
Expand All @@ -48,6 +50,8 @@ const (
keyOpDefaultTimeout = 2 * time.Second
keyOpRetryInterval = 30 * time.Millisecond
checkVersInterval = 20 * time.Millisecond

ddlPrompt = "ddl-syncer"
)

var (
Expand All @@ -57,8 +61,8 @@ var (
// SyncerSessionTTL is the etcd session's TTL in seconds.
// and it's an exported variable for testing.
SyncerSessionTTL = 90
// WaitTimeWhenErrorOccured is waiting interval when processing DDL jobs encounter errors.
WaitTimeWhenErrorOccured = 1 * time.Second
// ddlLogCtx uses for log.
ddlLogCtx = context.Background()
)

// SchemaSyncer is used to synchronize schema version between the DDL worker leader and followers through etcd.
Expand Down Expand Up @@ -86,6 +90,17 @@ type SchemaSyncer interface {
// the latest schema version. If the result is false, wait for a while and check again util the processing time reach 2 * lease.
// It returns until all servers' versions are equal to the latest version or the ctx is done.
OwnerCheckAllVersions(ctx context.Context, latestVer int64) error
// NotifyCleanExpiredPaths informs to clean up expired paths.
// The returned value is used for testing.
NotifyCleanExpiredPaths() bool
// StartCleanWork starts to clean up tasks.
StartCleanWork()
// CloseCleanWork ends cleanup tasks.
CloseCleanWork()
}

type ownerChecker interface {
IsOwner() bool
}

type schemaVersionSyncer struct {
Expand All @@ -96,13 +111,21 @@ type schemaVersionSyncer struct {
sync.RWMutex
globalVerCh clientv3.WatchChan
}

// for clean worker
ownerChecker ownerChecker
notifyCleanExpiredPathsCh chan struct{}
quiteCh chan struct{}
}

// NewSchemaSyncer creates a new SchemaSyncer.
func NewSchemaSyncer(etcdCli *clientv3.Client, id string) SchemaSyncer {
func NewSchemaSyncer(etcdCli *clientv3.Client, id string, oc ownerChecker) SchemaSyncer {
return &schemaVersionSyncer{
etcdCli: etcdCli,
selfSchemaVerPath: fmt.Sprintf("%s/%s", DDLAllSchemaVersions, id),
etcdCli: etcdCli,
selfSchemaVerPath: fmt.Sprintf("%s/%s", DDLAllSchemaVersions, id),
ownerChecker: oc,
notifyCleanExpiredPathsCh: make(chan struct{}, 1),
quiteCh: make(chan struct{}),
}
}

Expand Down Expand Up @@ -380,3 +403,106 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, latestV
time.Sleep(checkVersInterval)
}
}

const (
opDefaultRetryCnt = 10
failedGetTTLLimit = 20
opDefaultTimeout = 3 * time.Second
opRetryInterval = 500 * time.Millisecond
)

// NeededCleanTTL is exported for testing.
var NeededCleanTTL = int64(-60)

func (s *schemaVersionSyncer) StartCleanWork() {
for {
select {
case <-s.notifyCleanExpiredPathsCh:
if !s.ownerChecker.IsOwner() {
continue
}

for i := 0; i < opDefaultRetryCnt; i++ {
childCtx, cancelFunc := context.WithTimeout(context.Background(), opDefaultTimeout)
resp, err := s.etcdCli.Leases(childCtx)
cancelFunc()
if err != nil {
logutil.Logger(ddlLogCtx).Info("[ddl] syncer clean expired paths, failed to get leases.", zap.Error(err))
continue
}

if isFinished := s.doCleanExpirePaths(resp.Leases); isFinished {
break
}
time.Sleep(opRetryInterval)
}
case <-s.quiteCh:
return
}
}
}

func (s *schemaVersionSyncer) CloseCleanWork() {
close(s.quiteCh)
}

func (s *schemaVersionSyncer) NotifyCleanExpiredPaths() bool {
var isNotified bool
var err error
startTime := time.Now()
select {
case s.notifyCleanExpiredPathsCh <- struct{}{}:
isNotified = true
default:
err = errors.New("channel is full, failed to notify clean expired paths")
}
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerNotifyCleanExpirePaths, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
return isNotified
}

func (s *schemaVersionSyncer) doCleanExpirePaths(leases []clientv3.LeaseStatus) bool {
failedGetIDs := 0
failedRevokeIDs := 0
startTime := time.Now()

defer func() {
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerCleanExpirePaths, metrics.RetLabel(nil)).Observe(time.Since(startTime).Seconds())
}()
// TODO: Now LeaseStatus only has lease ID.
for _, lease := range leases {
// The DDL owner key uses '%x', so here print it too.
leaseID := fmt.Sprintf("%x, %d", lease.ID, lease.ID)
childCtx, cancelFunc := context.WithTimeout(context.Background(), opDefaultTimeout)
ttlResp, err := s.etcdCli.TimeToLive(childCtx, lease.ID)
cancelFunc()
if err != nil {
logutil.Logger(ddlLogCtx).Info("[ddl] syncer clean expired paths, failed to get one TTL.", zap.String("leaseID", leaseID), zap.Error(err))
failedGetIDs++
continue
}

if failedGetIDs > failedGetTTLLimit {
return false
}
if ttlResp.TTL >= NeededCleanTTL {
continue
}

st := time.Now()
childCtx, cancelFunc = context.WithTimeout(context.Background(), opDefaultTimeout)
_, err = s.etcdCli.Revoke(childCtx, lease.ID)
cancelFunc()
if err != nil && terror.ErrorEqual(err, rpctypes.ErrLeaseNotFound) {
logutil.Logger(ddlLogCtx).Warn("[ddl] syncer clean expired paths, failed to revoke lease.", zap.String("leaseID", leaseID),
zap.Int64("TTL", ttlResp.TTL), zap.Error(err))
failedRevokeIDs++
}
logutil.Logger(ddlLogCtx).Warn("[ddl] syncer clean expired paths,", zap.String("leaseID", leaseID), zap.Int64("TTL", ttlResp.TTL))
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerCleanOneExpirePath, metrics.RetLabel(err)).Observe(time.Since(st).Seconds())
}

if failedGetIDs == 0 && failedRevokeIDs == 0 {
return true
}
return false
}
Loading

0 comments on commit 842acd8

Please sign in to comment.