Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support to check whether the table can be duplicated by CDC #368

Merged
merged 15 commits into from
Mar 25, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package cdc

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -25,11 +24,7 @@ import (
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/roles"
"github.com/pingcap/ticdc/pkg/flags"
"github.com/pingcap/ticdc/pkg/util"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/tikv"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/mvcc"
Expand Down Expand Up @@ -189,21 +184,3 @@ func (c *Capture) Close(ctx context.Context) error {
func (c *Capture) register(ctx context.Context) error {
return errors.Trace(c.etcdClient.PutCaptureInfo(ctx, c.info, c.session.Lease()))
}

func createTiStore(urls string) (tidbkv.Storage, error) {
urlv, err := flags.NewURLsValue(urls)
if err != nil {
return nil, errors.Trace(err)
}

// Ignore error if it is already registered.
_ = store.Register("tikv", tikv.Driver{})

tiPath := fmt.Sprintf("tikv://%s?disableGC=true", urlv.HostString())
tiStore, err := store.New(tiPath)
if err != nil {
return nil, errors.Trace(err)
}

return tiStore, nil
}
5 changes: 5 additions & 0 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ func (ti *TableInfo) IsColumnUnique(colID int64) bool {
return exist
}

// ExistTableUniqueColumn returns whether the table has the unique column
func (ti *TableInfo) ExistTableUniqueColumn() bool {
return len(ti.UniqueColumns) != 0
}

// IsIndexUnique returns whether the index is unique
func (ti *TableInfo) IsIndexUnique(indexInfo *timodel.IndexInfo) bool {
if indexInfo.Primary {
Expand Down
85 changes: 83 additions & 2 deletions cdc/kv/store_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@
package kv

import (
"fmt"
"sort"

"github.com/pingcap/ticdc/pkg/flags"
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/util/codec"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
)

// LoadHistoryDDLJobs loads all history DDL jobs from TiDB.
func LoadHistoryDDLJobs(tiStore tidbkv.Storage) ([]*model.Job, error) {
func loadHistoryDDLJobs(tiStore tidbkv.Storage) ([]*model.Job, error) {
snapMeta, err := getSnapshotMeta(tiStore)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -52,3 +58,78 @@ func getSnapshotMeta(tiStore tidbkv.Storage) (*meta.Meta, error) {
}
return meta.NewSnapshotMeta(snapshot), nil
}

// LoadHistoryDDLJobs loads all history DDL jobs from TiDB.
func LoadHistoryDDLJobs(kvStore tidbkv.Storage) ([]*model.Job, error) {
originalJobs, err := loadHistoryDDLJobs(kvStore)
jobs := make([]*model.Job, 0, len(originalJobs))
if err != nil {
return nil, err
}
tikvStorage, ok := kvStore.(tikv.Storage)
for _, job := range originalJobs {
if job.State != model.JobStateSynced && job.State != model.JobStateDone {
continue
}
if ok {
err := resetFinishedTs(tikvStorage, job)
if err != nil {
return nil, errors.Trace(err)
}
}
jobs = append(jobs, job)
}
return jobs, nil
}

func resetFinishedTs(kvStore tikv.Storage, job *model.Job) error {
helper := helper.NewHelper(kvStore)
diffKey := schemaDiffKey(job.BinlogInfo.SchemaVersion)
resp, err := helper.GetMvccByEncodedKey(diffKey)
if err != nil {
return errors.Trace(err)
}
mvcc := resp.GetInfo()
if mvcc == nil || len(mvcc.Writes) == 0 {
return errors.NotFoundf("mvcc info, ddl job id: %d, schema version: %d", job.ID, job.BinlogInfo.SchemaVersion)
}
var finishedTS uint64
for _, w := range mvcc.Writes {
if finishedTS < w.CommitTs {
finishedTS = w.CommitTs
}
}
job.BinlogInfo.FinishedTS = finishedTS
return nil
}

// CreateTiStore creates a new tikv storage client
func CreateTiStore(urls string) (tidbkv.Storage, error) {
urlv, err := flags.NewURLsValue(urls)
if err != nil {
return nil, errors.Trace(err)
}

// Ignore error if it is already registered.
_ = store.Register("tikv", tikv.Driver{})

tiPath := fmt.Sprintf("tikv://%s?disableGC=true", urlv.HostString())
tiStore, err := store.New(tiPath)
if err != nil {
return nil, errors.Trace(err)
}

return tiStore, nil
}

func schemaDiffKey(schemaVersion int64) []byte {
metaPrefix := []byte("m")
mSchemaDiffPrefix := "Diff"
StringData := 's'
key := []byte(fmt.Sprintf("%s:%d", mSchemaDiffPrefix, schemaVersion))

ek := make([]byte, 0, len(metaPrefix)+len(key)+24)
ek = append(ek, metaPrefix...)
ek = codec.EncodeBytes(ek, key)
return codec.EncodeUint(ek, uint64(StringData))
}
13 changes: 13 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,16 @@ func (e *DDLEvent) FromMqMessage(key *MqMessageKey, value *MqMessageDDL) {
e.Type = value.Type
e.Query = value.Query
}

// FromJob fills the values of DDLEvent from DDL job
func (e *DDLEvent) FromJob(job *model.Job) {
var tableName string
if job.BinlogInfo.TableInfo != nil {
tableName = job.BinlogInfo.TableInfo.Name.O
}
e.Ts = job.BinlogInfo.FinishedTS
e.Query = job.Query
e.Schema = job.SchemaName
e.Table = tableName
e.Type = job.Type
}
68 changes: 41 additions & 27 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"io"
"math"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -317,12 +318,34 @@ func (c *changeFeed) banlanceOrphanTables(ctx context.Context, captures map[stri
}
}

func (c *changeFeed) applyJob(job *timodel.Job) error {
func (c *changeFeed) checkJob(job *timodel.Job) (skip bool) {
switch job.Type {
case timodel.ActionCreateTable:
tableInfo := entry.WrapTableInfo(job.BinlogInfo.TableInfo)
log.Warn("this table is not eligible to duplicate", zap.Reflect("job", job))
zier-one marked this conversation as resolved.
Show resolved Hide resolved
return !tableInfo.ExistTableUniqueColumn()
case timodel.ActionDropColumn, timodel.ActionDropIndex, timodel.ActionDropPrimaryKey:
tableInfo := entry.WrapTableInfo(job.BinlogInfo.TableInfo)
if tableInfo.ExistTableUniqueColumn() {
return false
}
log.Warn("this table is not eligible to duplicate, stop to duplicate this table", zap.Reflect("job", job))
c.removeTable(uint64(job.SchemaID), uint64(job.TableID))
return true
}
return false
}

func (c *changeFeed) applyJob(job *timodel.Job) (skip bool, err error) {
log.Info("apply job", zap.String("sql", job.Query), zap.Stringer("job", job))

schamaName, tableName, _, err := c.schema.HandleDDL(job)
if err != nil {
return errors.Trace(err)
return false, errors.Trace(err)
}

if c.checkJob(job) {
return true, nil
}

schemaID := uint64(job.SchemaID)
Expand All @@ -349,7 +372,7 @@ func (c *changeFeed) applyJob(job *timodel.Job) error {
c.addTable(schemaID, addID, job.BinlogInfo.FinishedTS, entry.TableName{Schema: schamaName, Table: tableName})
}

return nil
return false, nil
}

type ownerImpl struct {
Expand Down Expand Up @@ -517,7 +540,12 @@ func (o *ownerImpl) newChangeFeed(
log.Info("Find new changefeed", zap.Reflect("info", info),
zap.String("id", id), zap.Uint64("checkpoint ts", checkpointTs))

jobs, err := getHistoryDDLJobs(o.pdEndpoints)
// TODO here we create another pb client,we should reuse them
kvStore, err := kv.CreateTiStore(strings.Join(o.pdEndpoints, ","))
if err != nil {
return nil, err
}
jobs, err := kv.LoadHistoryDDLJobs(kvStore)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -823,40 +851,26 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C
return nil
}
}

ddlEvent := new(model.DDLEvent)
ddlEvent.FromJob(todoDDLJob)
// Execute DDL Job asynchronously
c.ddlState = model.ChangeFeedExecDDL
log.Debug("apply job", zap.Stringer("job", todoDDLJob),
zap.String("schema", todoDDLJob.SchemaName),
zap.String("query", todoDDLJob.Query),
zap.Uint64("ts", todoDDLJob.BinlogInfo.FinishedTS))

var tableName, schemaName string
if todoDDLJob.BinlogInfo.TableInfo != nil {
tableName = todoDDLJob.BinlogInfo.TableInfo.Name.O
}
// TODO consider some newly added DDL types such as `ActionCreateSequence`
if todoDDLJob.Type != timodel.ActionCreateSchema {
dbInfo, exist := c.schema.SchemaByID(todoDDLJob.SchemaID)
if !exist {
return errors.NotFoundf("schema %d not found", todoDDLJob.SchemaID)
}
schemaName = dbInfo.Name.O
} else {
schemaName = todoDDLJob.BinlogInfo.DBInfo.Name.O
}
ddlEvent := &model.DDLEvent{
Ts: todoDDLJob.BinlogInfo.FinishedTS,
Query: todoDDLJob.Query,
Schema: schemaName,
Table: tableName,
Type: todoDDLJob.Type,
}

err := c.applyJob(todoDDLJob)
skip, err := c.applyJob(todoDDLJob)
if err != nil {
return errors.Trace(err)
}
if skip {
c.ddlJobHistory = c.ddlJobHistory[1:]
c.ddlExecutedTs = todoDDLJob.BinlogInfo.FinishedTS
c.ddlState = model.ChangeFeedSyncDML
return nil
}

c.banlanceOrphanTables(ctx, captures)

Expand Down
68 changes: 6 additions & 62 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/log"
timodel "github.com/pingcap/parser/model"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/ticdc/cdc/entry"
"github.com/pingcap/ticdc/cdc/kv"
Expand All @@ -38,10 +37,7 @@ import (
"github.com/pingcap/ticdc/cdc/sink"
"github.com/pingcap/ticdc/pkg/retry"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/codec"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"go.uber.org/zap"
Expand Down Expand Up @@ -553,69 +549,17 @@ func (p *processor) syncResolved(ctx context.Context) error {
}

func createSchemaBuilder(pdEndpoints []string, ddlEventCh <-chan *model.RawKVEntry) (*entry.StorageBuilder, error) {
jobs, err := getHistoryDDLJobs(pdEndpoints)
if err != nil {
return nil, errors.Trace(err)
}
builder := entry.NewStorageBuilder(jobs, ddlEventCh)
return builder, nil
}

func getHistoryDDLJobs(pdEndpoints []string) ([]*timodel.Job, error) {
// TODO here we create another pb client,we should reuse them
kvStore, err := createTiStore(strings.Join(pdEndpoints, ","))
kvStore, err := kv.CreateTiStore(strings.Join(pdEndpoints, ","))
if err != nil {
return nil, err
}
originalJobs, err := kv.LoadHistoryDDLJobs(kvStore)
jobs := make([]*timodel.Job, 0, len(originalJobs))
if err != nil {
return nil, err
}
for _, job := range originalJobs {
if job.State != timodel.JobStateSynced && job.State != timodel.JobStateDone {
continue
}
err := resetFinishedTs(kvStore.(tikv.Storage), job)
if err != nil {
return nil, errors.Trace(err)
}
jobs = append(jobs, job)
return nil, errors.Trace(err)
}
return jobs, nil
}

func resetFinishedTs(kvStore tikv.Storage, job *timodel.Job) error {
helper := helper.NewHelper(kvStore)
diffKey := schemaDiffKey(job.BinlogInfo.SchemaVersion)
resp, err := helper.GetMvccByEncodedKey(diffKey)
jobs, err := kv.LoadHistoryDDLJobs(kvStore)
if err != nil {
return errors.Trace(err)
}
mvcc := resp.GetInfo()
if mvcc == nil || len(mvcc.Writes) == 0 {
return errors.NotFoundf("mvcc info, ddl job id: %d, schema version: %d", job.ID, job.BinlogInfo.SchemaVersion)
}
var finishedTS uint64
for _, w := range mvcc.Writes {
if finishedTS < w.CommitTs {
finishedTS = w.CommitTs
}
return nil, errors.Trace(err)
}
job.BinlogInfo.FinishedTS = finishedTS
return nil
}

func schemaDiffKey(schemaVersion int64) []byte {
metaPrefix := []byte("m")
mSchemaDiffPrefix := "Diff"
StringData := 's'
key := []byte(fmt.Sprintf("%s:%d", mSchemaDiffPrefix, schemaVersion))

ek := make([]byte, 0, len(metaPrefix)+len(key)+24)
ek = append(ek, metaPrefix...)
ek = codec.EncodeBytes(ek, key)
return codec.EncodeUint(ek, uint64(StringData))
builder := entry.NewStorageBuilder(jobs, ddlEventCh)
return builder, nil
}

func createTsRWriter(cli kv.CDCEtcdClient, changefeedID, captureID string) (storage.ProcessorTsRWriter, error) {
Expand Down
Loading