Skip to content

Commit

Permalink
capture: implement the task watcher (#337)
Browse files Browse the repository at this point in the history
* capture: implement the task wather

A task watcher waits for new tasks and returns the
task event, than the capture can start a processor
to handle the task.

It is a replacement of the ChangeFeedWatcher and
ProcessorWatcher, the new solution would be more
clear and simpler.

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* owner: dispatch job to captures

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* task: init the cfg

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* task: fix when parsing a task key

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* task: watch in another goroutine

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* task: close the channel

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* task: refine the task persistent keys

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* task: add TaskStatusKeyPrefix

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* task: fix the status key

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* task: fix the path of task positions

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* task: fix the job etcd key

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* changefeed: fix the job path

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* build: fix go lint

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* task: support to handle the task commands

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* task: fix to return the correct task events

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* task: add unit tests for task watcher

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* task: add test case for resuming a task

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* processor: remove the task status/postion when stopped

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* capture: fix when handling TaskOpDelete

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* owner: remove unuseful log

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* tests: add availability integration tests

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* capture: remove unused functions

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* scheduler: log error message when running the processor

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* task: refine the task watcher and fix missing events

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* capture: return errors

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

* processor: use info log level for context canceled

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>
  • Loading branch information
shafreeck authored Mar 25, 2020
1 parent cee8b68 commit 1198c55
Show file tree
Hide file tree
Showing 11 changed files with 770 additions and 635 deletions.
76 changes: 41 additions & 35 deletions cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,20 @@ import (
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/roles"
"github.com/pingcap/ticdc/pkg/flags"
"github.com/pingcap/ticdc/pkg/util"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/tikv"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/mvcc"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
)

const (
ownerRunInterval = time.Millisecond * 500
cfWatcherRetryDelay = time.Millisecond * 500
captureSessionTTL = 3
ownerRunInterval = time.Millisecond * 500
captureSessionTTL = 3
)

// Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it.
Expand Down Expand Up @@ -115,22 +111,6 @@ func NewCapture(pdEndpoints []string) (c *Capture, err error) {
return
}

var _ processorCallback = &Capture{}

// OnRunProcessor implements processorCallback.
func (c *Capture) OnRunProcessor(p *processor) {
c.processors[p.changefeedID] = p
}

// OnStopProcessor implements processorCallback.
func (c *Capture) OnStopProcessor(p *processor, err error) {
// TODO: handle processor error
log.Info("stop to run processor", zap.String("changefeed id", p.changefeedID), util.ZapErrorFilter(err, context.Canceled))
c.procLock.Lock()
defer c.procLock.Unlock()
delete(c.processors, p.changefeedID)
}

// Start starts the Capture mainloop
func (c *Capture) Start(ctx context.Context) (err error) {
// TODO: better channgefeed model with etcd storage
Expand All @@ -150,22 +130,48 @@ func (c *Capture) Start(ctx context.Context) (err error) {
return c.ownerWorker.Run(cctx, ownerRunInterval)
})

rl := rate.NewLimiter(0.1, 5)
watcher := NewChangeFeedWatcher(c.info.ID, c.pdEndpoints, c.etcdClient)
errg.Go(func() error {
for {
if !rl.Allow() {
return errors.New("changefeed watcher exceeds rate limit")
taskWatcher := NewTaskWatcher(c, &TaskWatcherConfig{
Prefix: kv.TaskStatusKeyPrefix + "/" + c.info.ID,
ChannelSize: 128,
})
log.Info("waiting for tasks", zap.String("captureid", c.info.ID))
for ev := range taskWatcher.Watch(ctx) {
if ev.Err != nil {
return errors.Trace(ev.Err)
}
task := ev.Task
if ev.Op == TaskOpCreate {
cf, err := c.etcdClient.GetChangeFeedInfo(ctx, task.ChangeFeedID)
if err != nil {
log.Error("get change feed info failed",
zap.String("changefeedid", task.ChangeFeedID),
zap.String("captureid", c.info.ID),
zap.Error(err))
return err
}
err := watcher.Watch(cctx, c)
if errors.Cause(err) == mvcc.ErrCompacted {
log.Warn("changefeed watcher watch retryable error", zap.Error(err))
time.Sleep(cfWatcherRetryDelay)
continue
log.Info("run processor", zap.String("captureid", c.info.ID),
zap.String("changefeedid", task.ChangeFeedID))
if _, ok := c.processors[task.ChangeFeedID]; !ok {
p, err := runProcessor(ctx, c.pdEndpoints, *cf, task.ChangeFeedID,
c.info.ID, task.CheckpointTS)
if err != nil {
log.Error("run processor failed",
zap.String("changefeedid", task.ChangeFeedID),
zap.String("captureid", c.info.ID),
zap.Error(err))
return err
}
c.processors[task.ChangeFeedID] = p
}
} else if ev.Op == TaskOpDelete {
if p, ok := c.processors[task.ChangeFeedID]; ok {
if err := p.stop(ctx); err != nil {
return errors.Trace(err)
}
delete(c.processors, task.ChangeFeedID)
}
return errors.Trace(err)
}
})
}

return errg.Wait()
}
Expand Down
63 changes: 47 additions & 16 deletions cdc/kv/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ const (

// ProcessorInfoKeyPrefix is the processor info path that is saved to etcd
ProcessorInfoKeyPrefix = EtcdKeyBase + "/processor/info"

// TaskKeyPrefix is the prefix of task keys
TaskKeyPrefix = EtcdKeyBase + "/task"

// TaskStatusKeyPrefix is the prefix of task status keys
TaskStatusKeyPrefix = TaskKeyPrefix + "/status"

// TaskPositionKeyPrefix is the prefix of task position keys
TaskPositionKeyPrefix = TaskKeyPrefix + "/position"

// JobKeyPrefix is the prefix of job keys
JobKeyPrefix = EtcdKeyBase + "/job"
)

// GetEtcdKeyChangeFeedList returns the prefix key of all changefeed config
Expand All @@ -50,7 +62,7 @@ func GetEtcdKeyChangeFeedInfo(changefeedID string) string {

// GetEtcdKeyChangeFeedStatus returns the key of a changefeed status
func GetEtcdKeyChangeFeedStatus(changefeedID string) string {
return fmt.Sprintf("%s/changefeed/status/%s", EtcdKeyBase, changefeedID)
return GetEtcdKeyJob(changefeedID)
}

// GetEtcdKeyTaskStatusList returns the key of a task status without captureID part
Expand All @@ -63,14 +75,9 @@ func GetEtcdKeyTaskPositionList(changefeedID string) string {
return fmt.Sprintf("%s/changefeed/task/position/%s", EtcdKeyBase, changefeedID)
}

// GetEtcdKeyTaskStatus returns the key of a task status
func GetEtcdKeyTaskStatus(changefeedID, captureID string) string {
return fmt.Sprintf("%s/%s", GetEtcdKeyTaskStatusList(changefeedID), captureID)
}

// GetEtcdKeyTaskPosition returns the key of a task position
func GetEtcdKeyTaskPosition(changefeedID, captureID string) string {
return fmt.Sprintf("%s/%s", GetEtcdKeyTaskPositionList(changefeedID), captureID)
return TaskPositionKeyPrefix + "/" + captureID + "/" + changefeedID
}

// GetEtcdKeyCaptureInfo returns the key of a capture info
Expand All @@ -83,6 +90,16 @@ func GetEtcdKeyProcessorInfo(captureID, processorID string) string {
return ProcessorInfoKeyPrefix + "/" + captureID + "/" + processorID
}

// GetEtcdKeyTaskStatus returns the key for the task status
func GetEtcdKeyTaskStatus(changeFeedID, captureID string) string {
return TaskStatusKeyPrefix + "/" + captureID + "/" + changeFeedID
}

// GetEtcdKeyJob returns the key for a job status
func GetEtcdKeyJob(changeFeedID string) string {
return JobKeyPrefix + "/" + changeFeedID
}

// CDCEtcdClient is a wrap of etcd client
type CDCEtcdClient struct {
Client *clientv3.Client
Expand Down Expand Up @@ -143,7 +160,7 @@ func (c CDCEtcdClient) DeleteChangeFeedInfo(ctx context.Context, id string) erro

// GetChangeFeedStatus queries the checkpointTs and resovledTs of a given changefeed
func (c CDCEtcdClient) GetChangeFeedStatus(ctx context.Context, id string) (*model.ChangeFeedStatus, error) {
key := GetEtcdKeyChangeFeedStatus(id)
key := GetEtcdKeyJob(id)
resp, err := c.Client.Get(ctx, key)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -222,17 +239,24 @@ func (c CDCEtcdClient) SaveChangeFeedInfo(ctx context.Context, info *model.Chang
// GetAllTaskPositions queries all task positions of a changefeed, and returns a map
// mapping from captureID to TaskPositions
func (c CDCEtcdClient) GetAllTaskPositions(ctx context.Context, changefeedID string) (map[string]*model.TaskPosition, error) {
key := GetEtcdKeyTaskPositionList(changefeedID)
resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix())
resp, err := c.Client.Get(ctx, TaskPositionKeyPrefix, clientv3.WithPrefix())
if err != nil {
return nil, errors.Trace(err)
}
positions := make(map[string]*model.TaskPosition, resp.Count)
for _, rawKv := range resp.Kvs {
captureID, err := util.ExtractKeySuffix(string(rawKv.Key))
changeFeed, err := util.ExtractKeySuffix(string(rawKv.Key))
if err != nil {
return nil, err
}
endIndex := len(rawKv.Key) - len(changeFeed) - 1
captureID, err := util.ExtractKeySuffix(string(rawKv.Key[0:endIndex]))
if err != nil {
return nil, err
}
if changeFeed != changefeedID {
continue
}
info := &model.TaskPosition{}
err = info.Unmarshal(rawKv.Value)
if err != nil {
Expand All @@ -246,17 +270,24 @@ func (c CDCEtcdClient) GetAllTaskPositions(ctx context.Context, changefeedID str
// GetAllTaskStatus queries all task status of a changefeed, and returns a map
// mapping from captureID to TaskStatus
func (c CDCEtcdClient) GetAllTaskStatus(ctx context.Context, changefeedID string) (model.ProcessorsInfos, error) {
key := GetEtcdKeyTaskStatusList(changefeedID)
resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix())
resp, err := c.Client.Get(ctx, TaskStatusKeyPrefix, clientv3.WithPrefix())
if err != nil {
return nil, errors.Trace(err)
}
pinfo := make(map[string]*model.TaskStatus, resp.Count)
for _, rawKv := range resp.Kvs {
captureID, err := util.ExtractKeySuffix(string(rawKv.Key))
changeFeed, err := util.ExtractKeySuffix(string(rawKv.Key))
if err != nil {
return nil, err
}
endIndex := len(rawKv.Key) - len(changeFeed) - 1
captureID, err := util.ExtractKeySuffix(string(rawKv.Key[0:endIndex]))
if err != nil {
return nil, err
}
if changeFeed != changefeedID {
continue
}
info := &model.TaskStatus{}
err = info.Unmarshal(rawKv.Value)
if err != nil {
Expand Down Expand Up @@ -369,7 +400,7 @@ func (c CDCEtcdClient) PutChangeFeedStatus(
changefeedID string,
status *model.ChangeFeedStatus,
) error {
key := GetEtcdKeyChangeFeedStatus(changefeedID)
key := GetEtcdKeyJob(changefeedID)
value, err := status.Marshal()
if err != nil {
return errors.Trace(err)
Expand All @@ -389,7 +420,7 @@ func (c CDCEtcdClient) PutAllChangeFeedStatus(ctx context.Context, infos map[mod
if err != nil {
return errors.Trace(err)
}
key := GetEtcdKeyChangeFeedStatus(changefeedID)
key := GetEtcdKeyJob(changefeedID)
ops = append(ops, clientv3.OpPut(key, storeVal))
if uint(len(ops)) >= embed.DefaultMaxTxnOps {
_, err = txn.Then(ops...).Commit()
Expand Down
6 changes: 0 additions & 6 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1140,17 +1140,11 @@ func (o *ownerImpl) markProcessorDown(ctx context.Context,
// lookup the task position for the processor
pos, exist := positions[p.CaptureID]
if !exist {
log.Warn("unkown processor deletion detected",
zap.String("processorid", p.ID),
zap.String("captureid", p.CaptureID))
return nil
}
// lookup the task position for the processor
status, exist := statuses[p.CaptureID]
if !exist {
log.Warn("unkown processor deletion detected",
zap.String("processorid", p.ID),
zap.String("capture", p.CaptureID))
return nil
}
snap := status.Snapshot(p.ChangeFeedID,
Expand Down
8 changes: 8 additions & 0 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,14 @@ func (p *processor) stop(ctx context.Context) error {
}
p.tablesMu.Unlock()
p.session.Close()

if err := p.etcdCli.DeleteTaskPosition(ctx, p.changefeedID, p.captureID); err != nil {
return err
}
if err := p.etcdCli.DeleteTaskStatus(ctx, p.changefeedID, p.captureID); err != nil {
return err
}

return errors.Trace(p.deregister(ctx))
}

Expand Down
Loading

0 comments on commit 1198c55

Please sign in to comment.