Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: remove expired keys on PD (#10406) #11014

Merged
merged 7 commits into from
Jul 5, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ type DDL interface {
// RegisterEventCh registers event channel for ddl.
RegisterEventCh(chan<- *util.Event)
// SchemaSyncer gets the schema syncer.
SchemaSyncer() SchemaSyncer
SchemaSyncer() util.SchemaSyncer
// OwnerManager gets the owner manager.
OwnerManager() owner.Manager
// GetID gets the ddl ID.
Expand Down Expand Up @@ -297,7 +297,7 @@ type ddlCtx struct {
uuid string
store kv.Storage
ownerManager owner.Manager
schemaSyncer SchemaSyncer
schemaSyncer util.SchemaSyncer
ddlJobDoneCh chan struct{}
ddlEventCh chan<- *util.Event
lease time.Duration // lease is schema lease.
Expand Down Expand Up @@ -364,15 +364,15 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
id := uuid.NewV4().String()
ctx, cancelFunc := context.WithCancel(ctx)
var manager owner.Manager
var syncer SchemaSyncer
var syncer util.SchemaSyncer
if etcdCli == nil {
// The etcdCli is nil if the store is localstore which is only used for testing.
// So we use mockOwnerManager and MockSchemaSyncer.
manager = owner.NewMockManager(id, cancelFunc)
syncer = NewMockSchemaSyncer()
} else {
manager = owner.NewOwnerManager(etcdCli, ddlPrompt, id, DDLOwnerKey, cancelFunc)
syncer = NewSchemaSyncer(etcdCli, id)
syncer = util.NewSchemaSyncer(etcdCli, id, manager)
}

ddlCtx := &ddlCtx{
Expand Down Expand Up @@ -455,6 +455,17 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) {
// checks owner firstly and try to find whether a job exists and run.
asyncNotify(worker.ddlJobCh)
}

go tidbutil.WithRecovery(
func() { d.schemaSyncer.StartCleanWork() },
func(r interface{}) {
if r != nil {
logutil.Logger(ddlLogCtx).Error("[ddl] DDL syncer clean worker meet panic",
zap.String("ID", d.uuid), zap.Reflect("r", r), zap.Stack("stack trace"))
metrics.PanicCounter.WithLabelValues(metrics.LabelDDLSyncer).Inc()
}
})
metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s", metrics.StartCleanWork)).Inc()
}
}

Expand All @@ -466,6 +477,7 @@ func (d *ddl) close() {
startTime := time.Now()
close(d.quitCh)
d.ownerManager.Cancel()
d.schemaSyncer.CloseCleanWork()
err := d.schemaSyncer.RemoveSelfVersionPath()
if err != nil {
logutil.Logger(ddlLogCtx).Error("[ddl] remove self version path failed", zap.Error(err))
Expand Down Expand Up @@ -528,7 +540,7 @@ func (d *ddl) genGlobalIDs(count int) ([]int64, error) {
}

// SchemaSyncer implements DDL.SchemaSyncer interface.
func (d *ddl) SchemaSyncer() SchemaSyncer {
func (d *ddl) SchemaSyncer() util.SchemaSyncer {
return d.schemaSyncer
}

Expand Down
4 changes: 4 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ var (
RunWorker = true
// ddlWorkerID is used for generating the next DDL worker ID.
ddlWorkerID = int32(0)
// WaitTimeWhenErrorOccured is waiting interval when processing DDL jobs encounter errors.
WaitTimeWhenErrorOccured = 1 * time.Second
)

type workerType byte
Expand Down Expand Up @@ -645,6 +647,8 @@ func (w *worker) waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time
if terror.ErrorEqual(err, context.DeadlineExceeded) {
return
}
d.schemaSyncer.NotifyCleanExpiredPaths()
// Wait until timeout.
select {
case <-ctx.Done():
return
Expand Down
14 changes: 12 additions & 2 deletions ddl/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/sessionctx"
)

var _ SchemaSyncer = &MockSchemaSyncer{}
var _ util.SchemaSyncer = &MockSchemaSyncer{}

const mockCheckVersInterval = 2 * time.Millisecond

Expand All @@ -37,7 +38,7 @@ type MockSchemaSyncer struct {
}

// NewMockSchemaSyncer creates a new mock SchemaSyncer.
func NewMockSchemaSyncer() SchemaSyncer {
func NewMockSchemaSyncer() util.SchemaSyncer {
return &MockSchemaSyncer{}
}

Expand Down Expand Up @@ -113,6 +114,15 @@ func (s *MockSchemaSyncer) OwnerCheckAllVersions(ctx context.Context, latestVer
}
}

// NotifyCleanExpiredPaths implements SchemaSyncer.NotifyCleanExpiredPaths interface.
func (s *MockSchemaSyncer) NotifyCleanExpiredPaths() bool { return true }

// StartCleanWork implements SchemaSyncer.StartCleanWork interface.
func (s *MockSchemaSyncer) StartCleanWork() {}

// CloseCleanWork implements SchemaSyncer.CloseCleanWork interface.
func (s *MockSchemaSyncer) CloseCleanWork() {}

type mockDelRange struct {
}

Expand Down
138 changes: 132 additions & 6 deletions ddl/syncer.go → ddl/util/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl
package util

import (
"context"
Expand All @@ -25,7 +25,9 @@ import (

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/pingcap/errors"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/util/logutil"
Expand All @@ -48,6 +50,8 @@ const (
keyOpDefaultTimeout = 2 * time.Second
keyOpRetryInterval = 30 * time.Millisecond
checkVersInterval = 20 * time.Millisecond

ddlPrompt = "ddl-syncer"
)

var (
Expand All @@ -57,8 +61,8 @@ var (
// SyncerSessionTTL is the etcd session's TTL in seconds.
// and it's an exported variable for testing.
SyncerSessionTTL = 90
// WaitTimeWhenErrorOccured is waiting interval when processing DDL jobs encounter errors.
WaitTimeWhenErrorOccured = 1 * time.Second
// ddlLogCtx uses for log.
ddlLogCtx = context.Background()
)

// SchemaSyncer is used to synchronize schema version between the DDL worker leader and followers through etcd.
Expand Down Expand Up @@ -86,6 +90,17 @@ type SchemaSyncer interface {
// the latest schema version. If the result is false, wait for a while and check again util the processing time reach 2 * lease.
// It returns until all servers' versions are equal to the latest version or the ctx is done.
OwnerCheckAllVersions(ctx context.Context, latestVer int64) error
// NotifyCleanExpiredPaths informs to clean up expired paths.
// The returned value is used for testing.
NotifyCleanExpiredPaths() bool
// StartCleanWork starts to clean up tasks.
StartCleanWork()
// CloseCleanWork ends cleanup tasks.
CloseCleanWork()
}

type ownerChecker interface {
IsOwner() bool
}

type schemaVersionSyncer struct {
Expand All @@ -96,13 +111,21 @@ type schemaVersionSyncer struct {
sync.RWMutex
globalVerCh clientv3.WatchChan
}

// for clean worker
ownerChecker ownerChecker
notifyCleanExpiredPathsCh chan struct{}
quiteCh chan struct{}
}

// NewSchemaSyncer creates a new SchemaSyncer.
func NewSchemaSyncer(etcdCli *clientv3.Client, id string) SchemaSyncer {
func NewSchemaSyncer(etcdCli *clientv3.Client, id string, oc ownerChecker) SchemaSyncer {
return &schemaVersionSyncer{
etcdCli: etcdCli,
selfSchemaVerPath: fmt.Sprintf("%s/%s", DDLAllSchemaVersions, id),
etcdCli: etcdCli,
selfSchemaVerPath: fmt.Sprintf("%s/%s", DDLAllSchemaVersions, id),
ownerChecker: oc,
notifyCleanExpiredPathsCh: make(chan struct{}, 1),
quiteCh: make(chan struct{}),
}
}

Expand Down Expand Up @@ -380,3 +403,106 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, latestV
time.Sleep(checkVersInterval)
}
}

const (
opDefaultRetryCnt = 10
failedGetTTLLimit = 20
opDefaultTimeout = 3 * time.Second
opRetryInterval = 500 * time.Millisecond
)

// NeededCleanTTL is exported for testing.
var NeededCleanTTL = int64(-60)

func (s *schemaVersionSyncer) StartCleanWork() {
for {
select {
case <-s.notifyCleanExpiredPathsCh:
if !s.ownerChecker.IsOwner() {
continue
}

for i := 0; i < opDefaultRetryCnt; i++ {
childCtx, cancelFunc := context.WithTimeout(context.Background(), opDefaultTimeout)
resp, err := s.etcdCli.Leases(childCtx)
cancelFunc()
if err != nil {
logutil.Logger(ddlLogCtx).Info("[ddl] syncer clean expired paths, failed to get leases.", zap.Error(err))
continue
}

if isFinished := s.doCleanExpirePaths(resp.Leases); isFinished {
break
}
time.Sleep(opRetryInterval)
}
case <-s.quiteCh:
return
}
}
}

func (s *schemaVersionSyncer) CloseCleanWork() {
close(s.quiteCh)
}

func (s *schemaVersionSyncer) NotifyCleanExpiredPaths() bool {
var isNotified bool
var err error
startTime := time.Now()
select {
case s.notifyCleanExpiredPathsCh <- struct{}{}:
isNotified = true
default:
err = errors.New("channel is full, failed to notify clean expired paths")
}
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerNotifyCleanExpirePaths, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
return isNotified
}

func (s *schemaVersionSyncer) doCleanExpirePaths(leases []clientv3.LeaseStatus) bool {
failedGetIDs := 0
failedRevokeIDs := 0
startTime := time.Now()

defer func() {
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerCleanExpirePaths, metrics.RetLabel(nil)).Observe(time.Since(startTime).Seconds())
}()
// TODO: Now LeaseStatus only has lease ID.
for _, lease := range leases {
// The DDL owner key uses '%x', so here print it too.
leaseID := fmt.Sprintf("%x, %d", lease.ID, lease.ID)
childCtx, cancelFunc := context.WithTimeout(context.Background(), opDefaultTimeout)
ttlResp, err := s.etcdCli.TimeToLive(childCtx, lease.ID)
cancelFunc()
if err != nil {
logutil.Logger(ddlLogCtx).Info("[ddl] syncer clean expired paths, failed to get one TTL.", zap.String("leaseID", leaseID), zap.Error(err))
failedGetIDs++
continue
}

if failedGetIDs > failedGetTTLLimit {
return false
}
if ttlResp.TTL >= NeededCleanTTL {
continue
}

st := time.Now()
childCtx, cancelFunc = context.WithTimeout(context.Background(), opDefaultTimeout)
_, err = s.etcdCli.Revoke(childCtx, lease.ID)
cancelFunc()
if err != nil && terror.ErrorEqual(err, rpctypes.ErrLeaseNotFound) {
logutil.Logger(ddlLogCtx).Warn("[ddl] syncer clean expired paths, failed to revoke lease.", zap.String("leaseID", leaseID),
zap.Int64("TTL", ttlResp.TTL), zap.Error(err))
failedRevokeIDs++
}
logutil.Logger(ddlLogCtx).Warn("[ddl] syncer clean expired paths,", zap.String("leaseID", leaseID), zap.Int64("TTL", ttlResp.TTL))
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerCleanOneExpirePath, metrics.RetLabel(err)).Observe(time.Since(st).Seconds())
}

if failedGetIDs == 0 && failedRevokeIDs == 0 {
return true
}
return false
}
Loading