diff --git a/cdc/capture.go b/cdc/capture.go index 05e3d9cee68..c9ad2dff9a4 100644 --- a/cdc/capture.go +++ b/cdc/capture.go @@ -26,24 +26,20 @@ import ( "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/cdc/roles" "github.com/pingcap/ticdc/pkg/flags" - "github.com/pingcap/ticdc/pkg/util" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store" "github.com/pingcap/tidb/store/tikv" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" - "go.etcd.io/etcd/mvcc" "go.uber.org/zap" "golang.org/x/sync/errgroup" - "golang.org/x/time/rate" "google.golang.org/grpc" "google.golang.org/grpc/backoff" ) const ( - ownerRunInterval = time.Millisecond * 500 - cfWatcherRetryDelay = time.Millisecond * 500 - captureSessionTTL = 3 + ownerRunInterval = time.Millisecond * 500 + captureSessionTTL = 3 ) // Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it. @@ -115,22 +111,6 @@ func NewCapture(pdEndpoints []string) (c *Capture, err error) { return } -var _ processorCallback = &Capture{} - -// OnRunProcessor implements processorCallback. -func (c *Capture) OnRunProcessor(p *processor) { - c.processors[p.changefeedID] = p -} - -// OnStopProcessor implements processorCallback. -func (c *Capture) OnStopProcessor(p *processor, err error) { - // TODO: handle processor error - log.Info("stop to run processor", zap.String("changefeed id", p.changefeedID), util.ZapErrorFilter(err, context.Canceled)) - c.procLock.Lock() - defer c.procLock.Unlock() - delete(c.processors, p.changefeedID) -} - // Start starts the Capture mainloop func (c *Capture) Start(ctx context.Context) (err error) { // TODO: better channgefeed model with etcd storage @@ -150,22 +130,48 @@ func (c *Capture) Start(ctx context.Context) (err error) { return c.ownerWorker.Run(cctx, ownerRunInterval) }) - rl := rate.NewLimiter(0.1, 5) - watcher := NewChangeFeedWatcher(c.info.ID, c.pdEndpoints, c.etcdClient) - errg.Go(func() error { - for { - if !rl.Allow() { - return errors.New("changefeed watcher exceeds rate limit") + taskWatcher := NewTaskWatcher(c, &TaskWatcherConfig{ + Prefix: kv.TaskStatusKeyPrefix + "/" + c.info.ID, + ChannelSize: 128, + }) + log.Info("waiting for tasks", zap.String("captureid", c.info.ID)) + for ev := range taskWatcher.Watch(ctx) { + if ev.Err != nil { + return errors.Trace(ev.Err) + } + task := ev.Task + if ev.Op == TaskOpCreate { + cf, err := c.etcdClient.GetChangeFeedInfo(ctx, task.ChangeFeedID) + if err != nil { + log.Error("get change feed info failed", + zap.String("changefeedid", task.ChangeFeedID), + zap.String("captureid", c.info.ID), + zap.Error(err)) + return err } - err := watcher.Watch(cctx, c) - if errors.Cause(err) == mvcc.ErrCompacted { - log.Warn("changefeed watcher watch retryable error", zap.Error(err)) - time.Sleep(cfWatcherRetryDelay) - continue + log.Info("run processor", zap.String("captureid", c.info.ID), + zap.String("changefeedid", task.ChangeFeedID)) + if _, ok := c.processors[task.ChangeFeedID]; !ok { + p, err := runProcessor(ctx, c.pdEndpoints, *cf, task.ChangeFeedID, + c.info.ID, task.CheckpointTS) + if err != nil { + log.Error("run processor failed", + zap.String("changefeedid", task.ChangeFeedID), + zap.String("captureid", c.info.ID), + zap.Error(err)) + return err + } + c.processors[task.ChangeFeedID] = p + } + } else if ev.Op == TaskOpDelete { + if p, ok := c.processors[task.ChangeFeedID]; ok { + if err := p.stop(ctx); err != nil { + return errors.Trace(err) + } + delete(c.processors, task.ChangeFeedID) } - return errors.Trace(err) } - }) + } return errg.Wait() } diff --git a/cdc/kv/etcd.go b/cdc/kv/etcd.go index 6cba78f855e..d184a8fdf2b 100644 --- a/cdc/kv/etcd.go +++ b/cdc/kv/etcd.go @@ -36,6 +36,18 @@ const ( // ProcessorInfoKeyPrefix is the processor info path that is saved to etcd ProcessorInfoKeyPrefix = EtcdKeyBase + "/processor/info" + + // TaskKeyPrefix is the prefix of task keys + TaskKeyPrefix = EtcdKeyBase + "/task" + + // TaskStatusKeyPrefix is the prefix of task status keys + TaskStatusKeyPrefix = TaskKeyPrefix + "/status" + + // TaskPositionKeyPrefix is the prefix of task position keys + TaskPositionKeyPrefix = TaskKeyPrefix + "/position" + + // JobKeyPrefix is the prefix of job keys + JobKeyPrefix = EtcdKeyBase + "/job" ) // GetEtcdKeyChangeFeedList returns the prefix key of all changefeed config @@ -50,7 +62,7 @@ func GetEtcdKeyChangeFeedInfo(changefeedID string) string { // GetEtcdKeyChangeFeedStatus returns the key of a changefeed status func GetEtcdKeyChangeFeedStatus(changefeedID string) string { - return fmt.Sprintf("%s/changefeed/status/%s", EtcdKeyBase, changefeedID) + return GetEtcdKeyJob(changefeedID) } // GetEtcdKeyTaskStatusList returns the key of a task status without captureID part @@ -63,14 +75,9 @@ func GetEtcdKeyTaskPositionList(changefeedID string) string { return fmt.Sprintf("%s/changefeed/task/position/%s", EtcdKeyBase, changefeedID) } -// GetEtcdKeyTaskStatus returns the key of a task status -func GetEtcdKeyTaskStatus(changefeedID, captureID string) string { - return fmt.Sprintf("%s/%s", GetEtcdKeyTaskStatusList(changefeedID), captureID) -} - // GetEtcdKeyTaskPosition returns the key of a task position func GetEtcdKeyTaskPosition(changefeedID, captureID string) string { - return fmt.Sprintf("%s/%s", GetEtcdKeyTaskPositionList(changefeedID), captureID) + return TaskPositionKeyPrefix + "/" + captureID + "/" + changefeedID } // GetEtcdKeyCaptureInfo returns the key of a capture info @@ -83,6 +90,16 @@ func GetEtcdKeyProcessorInfo(captureID, processorID string) string { return ProcessorInfoKeyPrefix + "/" + captureID + "/" + processorID } +// GetEtcdKeyTaskStatus returns the key for the task status +func GetEtcdKeyTaskStatus(changeFeedID, captureID string) string { + return TaskStatusKeyPrefix + "/" + captureID + "/" + changeFeedID +} + +// GetEtcdKeyJob returns the key for a job status +func GetEtcdKeyJob(changeFeedID string) string { + return JobKeyPrefix + "/" + changeFeedID +} + // CDCEtcdClient is a wrap of etcd client type CDCEtcdClient struct { Client *clientv3.Client @@ -143,7 +160,7 @@ func (c CDCEtcdClient) DeleteChangeFeedInfo(ctx context.Context, id string) erro // GetChangeFeedStatus queries the checkpointTs and resovledTs of a given changefeed func (c CDCEtcdClient) GetChangeFeedStatus(ctx context.Context, id string) (*model.ChangeFeedStatus, error) { - key := GetEtcdKeyChangeFeedStatus(id) + key := GetEtcdKeyJob(id) resp, err := c.Client.Get(ctx, key) if err != nil { return nil, errors.Trace(err) @@ -222,17 +239,24 @@ func (c CDCEtcdClient) SaveChangeFeedInfo(ctx context.Context, info *model.Chang // GetAllTaskPositions queries all task positions of a changefeed, and returns a map // mapping from captureID to TaskPositions func (c CDCEtcdClient) GetAllTaskPositions(ctx context.Context, changefeedID string) (map[string]*model.TaskPosition, error) { - key := GetEtcdKeyTaskPositionList(changefeedID) - resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) + resp, err := c.Client.Get(ctx, TaskPositionKeyPrefix, clientv3.WithPrefix()) if err != nil { return nil, errors.Trace(err) } positions := make(map[string]*model.TaskPosition, resp.Count) for _, rawKv := range resp.Kvs { - captureID, err := util.ExtractKeySuffix(string(rawKv.Key)) + changeFeed, err := util.ExtractKeySuffix(string(rawKv.Key)) if err != nil { return nil, err } + endIndex := len(rawKv.Key) - len(changeFeed) - 1 + captureID, err := util.ExtractKeySuffix(string(rawKv.Key[0:endIndex])) + if err != nil { + return nil, err + } + if changeFeed != changefeedID { + continue + } info := &model.TaskPosition{} err = info.Unmarshal(rawKv.Value) if err != nil { @@ -246,17 +270,24 @@ func (c CDCEtcdClient) GetAllTaskPositions(ctx context.Context, changefeedID str // GetAllTaskStatus queries all task status of a changefeed, and returns a map // mapping from captureID to TaskStatus func (c CDCEtcdClient) GetAllTaskStatus(ctx context.Context, changefeedID string) (model.ProcessorsInfos, error) { - key := GetEtcdKeyTaskStatusList(changefeedID) - resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix()) + resp, err := c.Client.Get(ctx, TaskStatusKeyPrefix, clientv3.WithPrefix()) if err != nil { return nil, errors.Trace(err) } pinfo := make(map[string]*model.TaskStatus, resp.Count) for _, rawKv := range resp.Kvs { - captureID, err := util.ExtractKeySuffix(string(rawKv.Key)) + changeFeed, err := util.ExtractKeySuffix(string(rawKv.Key)) if err != nil { return nil, err } + endIndex := len(rawKv.Key) - len(changeFeed) - 1 + captureID, err := util.ExtractKeySuffix(string(rawKv.Key[0:endIndex])) + if err != nil { + return nil, err + } + if changeFeed != changefeedID { + continue + } info := &model.TaskStatus{} err = info.Unmarshal(rawKv.Value) if err != nil { @@ -369,7 +400,7 @@ func (c CDCEtcdClient) PutChangeFeedStatus( changefeedID string, status *model.ChangeFeedStatus, ) error { - key := GetEtcdKeyChangeFeedStatus(changefeedID) + key := GetEtcdKeyJob(changefeedID) value, err := status.Marshal() if err != nil { return errors.Trace(err) @@ -389,7 +420,7 @@ func (c CDCEtcdClient) PutAllChangeFeedStatus(ctx context.Context, infos map[mod if err != nil { return errors.Trace(err) } - key := GetEtcdKeyChangeFeedStatus(changefeedID) + key := GetEtcdKeyJob(changefeedID) ops = append(ops, clientv3.OpPut(key, storeVal)) if uint(len(ops)) >= embed.DefaultMaxTxnOps { _, err = txn.Then(ops...).Commit() diff --git a/cdc/owner.go b/cdc/owner.go index 6c5f8a12b3c..c77f259ac8a 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -1140,17 +1140,11 @@ func (o *ownerImpl) markProcessorDown(ctx context.Context, // lookup the task position for the processor pos, exist := positions[p.CaptureID] if !exist { - log.Warn("unkown processor deletion detected", - zap.String("processorid", p.ID), - zap.String("captureid", p.CaptureID)) return nil } // lookup the task position for the processor status, exist := statuses[p.CaptureID] if !exist { - log.Warn("unkown processor deletion detected", - zap.String("processorid", p.ID), - zap.String("capture", p.CaptureID)) return nil } snap := status.Snapshot(p.ChangeFeedID, diff --git a/cdc/processor.go b/cdc/processor.go index 79f16c55a55..77d081c2677 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -702,6 +702,14 @@ func (p *processor) stop(ctx context.Context) error { } p.tablesMu.Unlock() p.session.Close() + + if err := p.etcdCli.DeleteTaskPosition(ctx, p.changefeedID, p.captureID); err != nil { + return err + } + if err := p.etcdCli.DeleteTaskStatus(ctx, p.changefeedID, p.captureID); err != nil { + return err + } + return errors.Trace(p.deregister(ctx)) } diff --git a/cdc/scheduler.go b/cdc/scheduler.go index 84fb396345b..0ce3a42b2a9 100644 --- a/cdc/scheduler.go +++ b/cdc/scheduler.go @@ -15,324 +15,25 @@ package cdc import ( "context" - "sync" - "sync/atomic" - "time" "github.com/pingcap/ticdc/cdc/sink" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/log" - "github.com/pingcap/ticdc/cdc/kv" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/util" - "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/mvcc" - "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" - "golang.org/x/time/rate" ) -const ( - checkTaskKeyInterval = time.Second * 1 -) - -var ( - runProcessorWatcher = realRunProcessorWatcher - runProcessor = realRunProcessor -) - -// ChangeFeedWatcher is a changefeed watcher -type ChangeFeedWatcher struct { - lock sync.RWMutex - captureID string - pdEndpoints []string - etcdCli kv.CDCEtcdClient - infos map[string]model.ChangeFeedInfo -} - -// NewChangeFeedWatcher creates a new changefeed watcher -func NewChangeFeedWatcher(captureID string, pdEndpoints []string, cli kv.CDCEtcdClient) *ChangeFeedWatcher { - w := &ChangeFeedWatcher{ - captureID: captureID, - pdEndpoints: pdEndpoints, - etcdCli: cli, - infos: make(map[string]model.ChangeFeedInfo), - } - return w -} - -func (w *ChangeFeedWatcher) processPutKv(kv *mvccpb.KeyValue) (bool, string, model.ChangeFeedInfo, error) { - needRunWatcher := false - changefeedID, err := util.ExtractKeySuffix(string(kv.Key)) - if err != nil { - return needRunWatcher, "", model.ChangeFeedInfo{}, err - } - info := model.ChangeFeedInfo{} - err = info.Unmarshal(kv.Value) - if err != nil { - return needRunWatcher, changefeedID, info, err - } - w.lock.Lock() - _, ok := w.infos[changefeedID] - if !ok { - needRunWatcher = true - } - if info.AdminJobType == model.AdminStop { - // only handle model.AdminStop, the model.AdminRemove case will be handled in `processDeleteKv` - delete(w.infos, changefeedID) - } else { - w.infos[changefeedID] = info - } - w.lock.Unlock() - // TODO: this info is not copied, should be readonly - return needRunWatcher, changefeedID, info, nil -} - -func (w *ChangeFeedWatcher) processDeleteKv(kv *mvccpb.KeyValue) error { - changefeedID, err := util.ExtractKeySuffix(string(kv.Key)) - if err != nil { - return errors.Trace(err) - } - w.lock.Lock() - delete(w.infos, changefeedID) - w.lock.Unlock() - return nil -} - -// Watch watches changefeed key base -func (w *ChangeFeedWatcher) Watch(ctx context.Context, cb processorCallback) error { - errCh := make(chan error, 1) - - revision, infos, err := w.etcdCli.GetChangeFeeds(ctx) - if err != nil { - return errors.Trace(err) - } - for changefeedID, kv := range infos { - needRunWatcher, _, info, err := w.processPutKv(kv) - if err != nil { - return errors.Trace(err) - } - if needRunWatcher { - _, err := runProcessorWatcher(ctx, changefeedID, w.captureID, w.pdEndpoints, w.etcdCli, info, errCh, cb) - if err != nil { - return errors.Trace(err) - } - } - } - - watchCh := w.etcdCli.Client.Watch(ctx, kv.GetEtcdKeyChangeFeedList(), clientv3.WithPrefix(), clientv3.WithRev(revision)) - for { - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-errCh: - return errors.Trace(err) - case resp, ok := <-watchCh: - if !ok { - log.Info("watcher is closed") - return nil - } - failpoint.Inject("WatchChangeFeedInfoCompactionErr", func() { - failpoint.Return(errors.Trace(mvcc.ErrCompacted)) - }) - respErr := resp.Err() - if respErr != nil { - return errors.Trace(respErr) - } - for _, ev := range resp.Events { - switch ev.Type { - case mvccpb.PUT: - needRunWatcher, changefeedID, info, err := w.processPutKv(ev.Kv) - if err != nil { - return errors.Trace(err) - } - if needRunWatcher { - _, err := runProcessorWatcher(ctx, changefeedID, w.captureID, w.pdEndpoints, w.etcdCli, info, errCh, cb) - if err != nil { - return errors.Trace(err) - } - } - case mvccpb.DELETE: - err := w.processDeleteKv(ev.Kv) - if err != nil { - return errors.Trace(err) - } - } - } - } - } -} - -// ProcessorWatcher is a processor watcher -type ProcessorWatcher struct { - pdEndpoints []string - changefeedID string - captureID string - etcdCli kv.CDCEtcdClient - info model.ChangeFeedInfo - checkpointTs uint64 - wg sync.WaitGroup - closed int32 -} - -// NewProcessorWatcher creates a new ProcessorWatcher instance -func NewProcessorWatcher( - changefeedID string, - captureID string, - pdEndpoints []string, - cli kv.CDCEtcdClient, - info model.ChangeFeedInfo, - checkpointTs uint64, -) *ProcessorWatcher { - return &ProcessorWatcher{ - changefeedID: changefeedID, - captureID: captureID, - pdEndpoints: pdEndpoints, - etcdCli: cli, - info: info, - checkpointTs: checkpointTs, - } -} - -func (w *ProcessorWatcher) isClosed() bool { - return atomic.LoadInt32(&w.closed) == 1 -} - -func (w *ProcessorWatcher) close() { - atomic.StoreInt32(&w.closed, 1) - w.wg.Wait() -} - -func (w *ProcessorWatcher) reopen() error { - if !w.isClosed() { - return errors.New("ProcessorWatcher is not closed") - } - atomic.StoreInt32(&w.closed, 0) - return nil -} - -// Watch wait for the key `/changefeed/task//cid>` appear and run the processor. -func (w *ProcessorWatcher) Watch(ctx context.Context, errCh chan<- error, cb processorCallback) { - defer w.wg.Done() - key := kv.GetEtcdKeyTaskStatus(w.changefeedID, w.captureID) - - getResp, err := w.etcdCli.Client.Get(ctx, key) - if err != nil { - errCh <- errors.Trace(err) - return - } - if getResp.Count == 0 { - rl := rate.NewLimiter(0.1, 5) - revision := getResp.Header.Revision - // wait for key to appear - log.Info("waiting dispatching tasks", - zap.String("key", key), - zap.Int64("rev", revision)) - watchCh := w.etcdCli.Client.Watch(ctx, key, clientv3.WithRev(revision)) - waitKeyLoop: - for { - if !rl.Allow() { - errCh <- errors.New("task key watcher exceeds rate limit") - return - } - select { - case <-ctx.Done(): - return - case resp, ok := <-watchCh: - if !ok { - log.Info("watcher is closed") - return - } - respErr := resp.Err() - if respErr != nil { - if respErr == mvcc.ErrCompacted { - continue waitKeyLoop - } - errCh <- errors.Trace(respErr) - return - } - for _, ev := range resp.Events { - switch ev.Type { - case mvccpb.PUT: - break waitKeyLoop - } - } - } - } - } - - cctx, cancel := context.WithCancel(ctx) - defer cancel() - err = runProcessor(cctx, w.pdEndpoints, w.info, w.changefeedID, w.captureID, w.checkpointTs, cb) - if err != nil { - errCh <- err - return - } - - for { - select { - case <-ctx.Done(): - err := ctx.Err() - if err != context.Canceled { - errCh <- err - } - return - case <-time.After(checkTaskKeyInterval): - resp, err := w.etcdCli.Client.Get(ctx, key) - if err != nil { - errCh <- errors.Trace(err) - return - } - // processor has been removed from this capture - if resp.Count == 0 { - return - } - } - } -} - -type processorCallback interface { - // OnRunProcessor is called when the processor is started. - OnRunProcessor(p *processor) - // OnStopProcessor is called when the processor is stopped. - OnStopProcessor(p *processor, err error) -} - -// realRunProcessorWatcher creates a new ProcessorWatcher and executes the Watch method. -func realRunProcessorWatcher( - ctx context.Context, - changefeedID string, - captureID string, - pdEndpoints []string, - etcdCli kv.CDCEtcdClient, - info model.ChangeFeedInfo, - errCh chan error, - cb processorCallback, -) (*ProcessorWatcher, error) { - status, err := etcdCli.GetChangeFeedStatus(ctx, changefeedID) - if err != nil && errors.Cause(err) != model.ErrChangeFeedNotExists { - return nil, errors.Trace(err) - } - checkpointTs := info.GetCheckpointTs(status) - sw := NewProcessorWatcher(changefeedID, captureID, pdEndpoints, etcdCli, info, checkpointTs) - ctx = util.PutChangefeedIDInCtx(ctx, changefeedID) - sw.wg.Add(1) - go sw.Watch(ctx, errCh, cb) - return sw, nil -} - -// realRunProcessor creates a new processor then starts it, and returns a channel to pass error. -func realRunProcessor( +// runProcessor creates a new processor then starts it. +func runProcessor( ctx context.Context, pdEndpoints []string, info model.ChangeFeedInfo, changefeedID string, captureID string, checkpointTs uint64, - cb processorCallback, -) error { +) (*processor, error) { opts := make(map[string]string, len(info.Opts)+2) for k, v := range info.Opts { opts[k] = v @@ -341,11 +42,11 @@ func realRunProcessor( opts[sink.OptCaptureID] = captureID filter, err := util.NewFilter(info.GetConfig()) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } sink, err := sink.NewSink(info.SinkURI, filter, opts) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } ctx, cancel := context.WithCancel(ctx) errCh := make(chan error, 1) @@ -357,23 +58,28 @@ func realRunProcessor( processor, err := NewProcessor(ctx, pdEndpoints, info, sink, changefeedID, captureID, checkpointTs) if err != nil { cancel() - return err + return nil, err } log.Info("start to run processor", zap.String("changefeed id", changefeedID)) - if cb != nil { - cb.OnRunProcessor(processor) - } - processor.Run(ctx, errCh) go func() { err := <-errCh - if cb != nil { - cb.OnStopProcessor(processor, err) + if errors.Cause(err) != context.Canceled { + log.Error("error on running processor", + zap.String("captureid", captureID), + zap.String("changefeedid", changefeedID), + zap.String("processorid", processor.id), + zap.Error(err)) + } else { + log.Info("processor exited", + zap.String("captureid", captureID), + zap.String("changefeedid", changefeedID), + zap.String("processorid", processor.id)) } cancel() }() - return nil + return processor, nil } diff --git a/cdc/scheduler_test.go b/cdc/scheduler_test.go index 3f81b5b6537..dc2bab68785 100644 --- a/cdc/scheduler_test.go +++ b/cdc/scheduler_test.go @@ -16,20 +16,11 @@ package cdc import ( "context" "net/url" - "sync" - "sync/atomic" - "time" "github.com/pingcap/check" - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - "github.com/pingcap/ticdc/cdc/kv" - "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/etcd" "github.com/pingcap/ticdc/pkg/util" - "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" - "go.etcd.io/etcd/mvcc" "golang.org/x/sync/errgroup" ) @@ -43,12 +34,6 @@ type schedulerSuite struct { var _ = check.Suite(&schedulerSuite{}) -var ( - runProcessorCount int32 - runChangeFeedWatcherCount int32 - errRunProcessor = errors.New("mock run processor error") -) - // Set up a embed etcd using free ports. func (s *schedulerSuite) SetUpTest(c *check.C) { dir := c.MkDir() @@ -67,254 +52,3 @@ func (s *schedulerSuite) TearDownTest(c *check.C) { c.Errorf("Error group error: %s", err) } } - -func mockRunProcessor( - ctx context.Context, - pdEndpoints []string, - detail model.ChangeFeedInfo, - changefeedID string, - captureID string, - checkpointTs uint64, - _ processorCallback, -) error { - atomic.AddInt32(&runProcessorCount, 1) - return nil -} - -func mockRunProcessorError( - ctx context.Context, - pdEndpoints []string, - detail model.ChangeFeedInfo, - changefeedID string, - captureID string, - checkpointTs uint64, - _ processorCallback, -) error { - return errRunProcessor -} - -func mockRunProcessorWatcher( - tx context.Context, - changefeedID string, - captureID string, - pdEndpoints []string, - etcdCli kv.CDCEtcdClient, - detail model.ChangeFeedInfo, - errCh chan error, - _ processorCallback, -) (*ProcessorWatcher, error) { - atomic.AddInt32(&runChangeFeedWatcherCount, 1) - return nil, nil -} - -func (s *schedulerSuite) TestProcessorWatcher(c *check.C) { - var ( - changefeedID = "test-changefeed" - captureID = "test-capture" - pdEndpoints = []string{} - detail = model.ChangeFeedInfo{} - key = kv.GetEtcdKeyTaskStatus(changefeedID, captureID) - ) - - oriRunProcessor := runProcessor - runProcessor = mockRunProcessor - defer func() { - runProcessor = oriRunProcessor - }() - - curl := s.clientURL.String() - etcdCli, err := clientv3.New(clientv3.Config{ - Endpoints: []string{curl}, - DialTimeout: 3 * time.Second, - }) - c.Assert(err, check.IsNil) - defer etcdCli.Close() - - cli := kv.NewCDCEtcdClient(etcdCli) - - // create a processor - _, err = cli.Client.Put(context.Background(), key, "{}") - c.Assert(err, check.IsNil) - - // processor exists before watch starts - errCh := make(chan error, 1) - sw, err := runProcessorWatcher(context.Background(), changefeedID, captureID, pdEndpoints, cli, detail, errCh, nil) - c.Assert(err, check.IsNil) - c.Assert(util.WaitSomething(10, time.Millisecond*50, func() bool { - return atomic.LoadInt32(&runProcessorCount) == 1 - }), check.IsTrue) - - // delete the processor - _, err = cli.Client.Delete(context.Background(), key) - c.Assert(err, check.IsNil) - time.Sleep(time.Second) - sw.close() - c.Assert(sw.isClosed(), check.IsTrue) - - // check ProcessorWatcher watch processor key can ben canceled - err = sw.reopen() - c.Assert(err, check.IsNil) - c.Assert(sw.isClosed(), check.IsFalse) - ctx, cancel := context.WithCancel(context.Background()) - sw.wg.Add(1) - go sw.Watch(ctx, errCh, nil) - cancel() - sw.close() - c.Assert(sw.isClosed(), check.IsTrue) - - // check watcher can find new processor in watch loop - errCh2 := make(chan error, 1) - _, err = runProcessorWatcher(context.Background(), changefeedID, captureID, pdEndpoints, cli, detail, errCh2, nil) - c.Assert(err, check.IsNil) - _, err = cli.Client.Put(context.Background(), key, "{}") - c.Assert(err, check.IsNil) - c.Assert(util.WaitSomething(10, time.Millisecond*50, func() bool { - return atomic.LoadInt32(&runProcessorCount) == 2 - }), check.IsTrue) -} - -func (s *schedulerSuite) TestProcessorWatcherError(c *check.C) { - var ( - changefeedID = "test-changefeed-err" - captureID = "test-capture-err" - pdEndpoints = []string{} - detail = model.ChangeFeedInfo{} - key = kv.GetEtcdKeyTaskStatus(changefeedID, captureID) - ) - - oriRunProcessor := runProcessor - runProcessor = mockRunProcessorError - defer func() { - runProcessor = oriRunProcessor - }() - - curl := s.clientURL.String() - etcdCli, err := clientv3.New(clientv3.Config{ - Endpoints: []string{curl}, - DialTimeout: 3 * time.Second, - }) - c.Assert(err, check.IsNil) - defer etcdCli.Close() - cli := kv.NewCDCEtcdClient(etcdCli) - - // create a processor - _, err = cli.Client.Put(context.Background(), key, "{}") - c.Assert(err, check.IsNil) - - errCh := make(chan error, 1) - sw, err := runProcessorWatcher(context.Background(), changefeedID, captureID, pdEndpoints, cli, detail, errCh, nil) - c.Assert(err, check.IsNil) - sw.wg.Add(1) - go sw.Watch(context.Background(), errCh, nil) - - c.Assert(util.WaitSomething(10, time.Millisecond*50, func() bool { - select { - case err := <-errCh: - return errors.Cause(err) == errRunProcessor - default: - return false - } - }), check.IsTrue) - - sw.close() - c.Assert(sw.isClosed(), check.IsTrue) -} - -func (s *schedulerSuite) TestChangeFeedWatcher(c *check.C) { - var ( - changefeedID = "test-changefeed-watcher" - captureID = "test-capture" - pdEndpoints = []string{} - sinkURI = "root@tcp(127.0.0.1:3306)/test" - detail = &model.ChangeFeedInfo{SinkURI: sinkURI} - key = kv.GetEtcdKeyChangeFeedInfo(changefeedID) - watcherRetry int64 = 0 - ) - - oriRunProcessorWatcher := runProcessorWatcher - runProcessorWatcher = mockRunProcessorWatcher - defer func() { - runProcessorWatcher = oriRunProcessorWatcher - }() - - curl := s.clientURL.String() - etcdCli, err := clientv3.New(clientv3.Config{ - Endpoints: []string{curl}, - DialTimeout: 3 * time.Second, - }) - c.Assert(err, check.IsNil) - defer etcdCli.Close() - cli := kv.NewCDCEtcdClient(etcdCli) - - ctx, cancel := context.WithCancel(context.Background()) - w := NewChangeFeedWatcher(captureID, pdEndpoints, cli) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - for { - err2 := w.Watch(ctx, nil) - switch errors.Cause(err2) { - case nil, context.Canceled: - return - case mvcc.ErrCompacted: - atomic.AddInt64(&watcherRetry, 1) - continue - default: - c.Fatal(err2) - } - } - }() - - // short wait to ensure ChangeFeedWatcher has started watch loop - // TODO: test watch key apperance with revision works as expected - time.Sleep(time.Millisecond * 100) - - // create a changefeed - err = cli.SaveChangeFeedInfo(context.Background(), detail, changefeedID) - c.Assert(err, check.IsNil) - c.Assert(util.WaitSomething(10, time.Millisecond*50, func() bool { - return atomic.LoadInt32(&runChangeFeedWatcherCount) == 1 - }), check.IsTrue) - w.lock.RLock() - c.Assert(len(w.infos), check.Equals, 1) - w.lock.RUnlock() - - // delete the changefeed - _, err = cli.Client.Delete(context.Background(), key) - c.Assert(err, check.IsNil) - c.Assert(util.WaitSomething(10, time.Millisecond*50, func() bool { - w.lock.RLock() - defer w.lock.RUnlock() - return len(w.infos) == 0 - }), check.IsTrue) - - c.Assert(failpoint.Enable("github.com/pingcap/ticdc/cdc/WatchChangeFeedInfoCompactionErr", "1*return"), check.IsNil) - - // create a changefeed - err = cli.SaveChangeFeedInfo(context.Background(), detail, changefeedID) - c.Assert(err, check.IsNil) - c.Assert(util.WaitSomething(10, time.Millisecond*50, func() bool { - return atomic.LoadInt32(&runChangeFeedWatcherCount) == 2 - }), check.IsTrue) - w.lock.RLock() - c.Assert(len(w.infos), check.Equals, 1) - w.lock.RUnlock() - c.Assert(atomic.LoadInt64(&watcherRetry), check.Equals, int64(1)) - - c.Assert(failpoint.Disable("github.com/pingcap/ticdc/cdc/WatchChangeFeedInfoCompactionErr"), check.IsNil) - - // dispatch a stop changefeed admin job - detail.AdminJobType = model.AdminStop - err = cli.SaveChangeFeedInfo(context.Background(), detail, changefeedID) - c.Assert(err, check.IsNil) - c.Assert(util.WaitSomething(10, time.Millisecond*50, func() bool { - w.lock.RLock() - defer w.lock.RUnlock() - return len(w.infos) == 0 - }), check.IsTrue) - - cancel() - wg.Wait() -} diff --git a/cdc/task.go b/cdc/task.go new file mode 100644 index 00000000000..e6f6a968bb4 --- /dev/null +++ b/cdc/task.go @@ -0,0 +1,228 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cdc + +import ( + "context" + "errors" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" +) + +// TaskEventOp is the operation of a task +type TaskEventOp string + +// Task Event Operatrions +const ( + TaskOpCreate TaskEventOp = "create" + TaskOpDelete TaskEventOp = "delete" +) + +// Task is dispatched by the owner +type Task struct { + ChangeFeedID string + CheckpointTS uint64 +} + +// TaskEvent represents a task is created or deleted +type TaskEvent struct { + Op TaskEventOp + Task *Task + Err error +} + +// TaskWatcher watches on new tasks +type TaskWatcher struct { + capture *Capture + cfg *TaskWatcherConfig + + events map[string]*TaskEvent +} + +// TaskWatcherConfig configures a watcher +type TaskWatcherConfig struct { + Prefix string + ChannelSize int64 +} + +// NewTaskWatcher returns a TaskWatcher +func NewTaskWatcher(c *Capture, cfg *TaskWatcherConfig) *TaskWatcher { + return &TaskWatcher{capture: c, cfg: cfg} +} + +// Watch on the new tasks, a channel is returned +func (w *TaskWatcher) Watch(ctx context.Context) <-chan *TaskEvent { + c := make(chan *TaskEvent, w.cfg.ChannelSize) + go w.watch(ctx, c) + return c +} + +func (w *TaskWatcher) watch(ctx context.Context, c chan *TaskEvent) { + etcd := w.capture.etcdClient.Client + + // Leader is required in this context to prevent read outdated data + // from a stale leader + ctx = clientv3.WithRequireLeader(ctx) + + // Send a task event to the channel, checks ctx.Done() to avoid blocking + send := func(ctx context.Context, ev *TaskEvent) error { + select { + case <-ctx.Done(): + close(c) + return ctx.Err() + case c <- ev: + } + return nil + } +restart: + // Load all the existed tasks + events := make(map[string]*TaskEvent) + resp, err := etcd.Get(ctx, w.cfg.Prefix, clientv3.WithPrefix()) + if err != nil { + _ = send(ctx, &TaskEvent{Err: err}) + return + } + for _, kv := range resp.Kvs { + ev, err := w.parseTaskEvent(ctx, kv.Key, kv.Value) + if err != nil { + log.Warn("parse task event failed", + zap.String("captureid", w.capture.info.ID), + zap.Error(err)) + continue + } + events[ev.Task.ChangeFeedID] = ev + } + + // Rebuild the missed events + // When an error is occured during watch, the watch routine is restarted, + // in that case, some events maybe missed. Rebuild the events by comparing + // the new task list with the last successfully recorded tasks. + events = w.rebuildTaskEvents(events) + for _, ev := range events { + if err := send(ctx, ev); err != nil { + return + } + } + + wch := etcd.Watch(ctx, w.cfg.Prefix, + clientv3.WithPrefix(), + clientv3.WithPrevKV(), + clientv3.WithRev(resp.Header.Revision+1)) + for wresp := range wch { + if wresp.Err() != nil { + goto restart + } + for _, ev := range wresp.Events { + if ev.Type == clientv3.EventTypePut { + ev, err := w.parseTaskEvent(ctx, ev.Kv.Key, ev.Kv.Value) + if err != nil { + log.Warn("parse task event failed", + zap.String("captureid", w.capture.info.ID), + zap.Error(err)) + continue + } + w.events[ev.Task.ChangeFeedID] = ev + if err := send(ctx, ev); err != nil { + return + } + } else if ev.Type == clientv3.EventTypeDelete { + task, err := w.parseTask(ctx, ev.PrevKv.Key, ev.PrevKv.Value) + if err != nil { + log.Warn("parse task failed", + zap.String("captureid", w.capture.info.ID), + zap.Error(err)) + continue + } + delete(w.events, task.ChangeFeedID) + if err := send(ctx, &TaskEvent{Op: TaskOpDelete, Task: task}); err != nil { + return + } + } + } + } + close(c) +} + +func (w *TaskWatcher) parseTask(ctx context.Context, + key, val []byte) (*Task, error) { + if len(key) <= len(w.cfg.Prefix) { + return nil, errors.New("invalid task key: " + string(key)) + } + changeFeedID := string(key[len(w.cfg.Prefix)+1:]) + cf, err := w.capture.etcdClient.GetChangeFeedInfo(ctx, changeFeedID) + if err != nil { + return nil, err + } + status, err := w.capture.etcdClient.GetChangeFeedStatus(ctx, changeFeedID) + if err != nil { + return nil, err + } + checkpointTs := cf.GetCheckpointTs(status) + return &Task{ChangeFeedID: changeFeedID, CheckpointTS: checkpointTs}, nil +} + +func (w *TaskWatcher) parseTaskEvent(ctx context.Context, key, val []byte) (*TaskEvent, error) { + task, err := w.parseTask(ctx, key, val) + if err != nil { + log.Warn("parse task failed", + zap.String("captureid", w.capture.info.ID), + zap.Error(err)) + return nil, err + } + + taskStatus := &model.TaskStatus{} + if err := taskStatus.Unmarshal(val); err != nil { + log.Warn("unmarshal task status failed", + zap.String("captureid", w.capture.info.ID), + zap.Error(err)) + return nil, err + } + var op TaskEventOp + switch taskStatus.AdminJobType { + case model.AdminNone, model.AdminResume: + op = TaskOpCreate + case model.AdminStop, model.AdminRemove: + op = TaskOpDelete + } + return &TaskEvent{Op: op, Task: task}, nil +} + +func (w *TaskWatcher) rebuildTaskEvents(latest map[string]*TaskEvent) map[string]*TaskEvent { + events := make(map[string]*TaskEvent) + outdated := w.events + for id, ev := range outdated { + // Check if the task still exists + if nev, ok := latest[id]; ok { + if ev.Op != nev.Op { + events[id] = nev + } + } else if ev.Op != TaskOpDelete { + events[id] = &TaskEvent{Op: TaskOpDelete, Task: ev.Task} + } + } + + for id, ev := range latest { + if _, ok := outdated[id]; !ok { + events[id] = ev + } + } + + // Update to the latest tasks + w.events = events + + return events +} diff --git a/cdc/task_test.go b/cdc/task_test.go new file mode 100644 index 00000000000..fa6fa3690f4 --- /dev/null +++ b/cdc/task_test.go @@ -0,0 +1,286 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cdc + +import ( + "context" + "math" + "time" + + "github.com/pingcap/check" + "github.com/pingcap/ticdc/cdc/kv" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/etcd" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/embed" +) + +type taskSuite struct { + s *embed.Etcd + c *clientv3.Client + w *TaskWatcher + endpoints []string +} + +var _ = check.Suite(&taskSuite{}) + +func (s *taskSuite) SetUpTest(c *check.C) { + dir := c.MkDir() + url, etcd, err := etcd.SetupEmbedEtcd(dir) + c.Assert(err, check.IsNil) + + endpoints := []string{url.String()} + client, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + }) + c.Assert(err, check.IsNil) + + // Create a task watcher + capture := &Capture{ + pdEndpoints: s.endpoints, + etcdClient: kv.NewCDCEtcdClient(client), + processors: make(map[string]*processor), + info: &model.CaptureInfo{ID: "task-suite-capture"}, + } + c.Assert(capture, check.NotNil) + watcher := NewTaskWatcher(capture, &TaskWatcherConfig{ + Prefix: kv.TaskStatusKeyPrefix + "/" + capture.info.ID, + }) + c.Assert(watcher, check.NotNil) + + s.s = etcd + s.c = client + s.w = watcher + s.endpoints = endpoints +} +func (s *taskSuite) TearDownTest(c *check.C) { + s.s.Close() + s.c.Close() +} + +func (s *taskSuite) TestNewTaskWatcher(c *check.C) { + // Create a capture instance by initialize the struct, + // NewCapture can not be used because it requires to + // initialize the PD service witch does not support to + // be embeded. + capture := &Capture{ + pdEndpoints: s.endpoints, + etcdClient: kv.NewCDCEtcdClient(s.c), + processors: make(map[string]*processor), + info: &model.CaptureInfo{ID: "task-suite-capture"}, + } + c.Assert(capture, check.NotNil) + c.Assert(NewTaskWatcher(capture, &TaskWatcherConfig{ + Prefix: kv.TaskStatusKeyPrefix + "/" + capture.info.ID, + }), check.NotNil) + capture.Close(context.Background()) +} + +func (s *taskSuite) setupFeedInfo(c *check.C, changeFeedID string) { + client := kv.NewCDCEtcdClient(s.c) + // Create the change feed + c.Assert(client.SaveChangeFeedInfo(s.c.Ctx(), &model.ChangeFeedInfo{ + SinkURI: "mysql://fake", + StartTs: 0, + TargetTs: math.MaxUint64, + CreateTime: time.Now(), + }, changeFeedID), check.IsNil) + + // Fake the change feed status + c.Assert(client.PutChangeFeedStatus(s.c.Ctx(), changeFeedID, + &model.ChangeFeedStatus{ + ResolvedTs: 1, + CheckpointTs: 1, + }), check.IsNil) +} +func (s *taskSuite) teardownFeedInfo(c *check.C, changeFeedID string) { + etcd := s.c + // Delete change feed info + resp, err := etcd.Delete(s.c.Ctx(), kv.GetEtcdKeyChangeFeedInfo(changeFeedID), clientv3.WithPrefix()) + c.Assert(err, check.IsNil) + c.Assert(resp, check.NotNil) + + // Delete change feed status(job status) + resp, err = etcd.Delete(s.c.Ctx(), kv.GetEtcdKeyJob(changeFeedID), clientv3.WithPrefix()) + c.Assert(err, check.IsNil) + c.Assert(resp, check.NotNil) +} + +func (s *taskSuite) TestParseTask(c *check.C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + changeFeedID := "task-suite-changefeed" + s.setupFeedInfo(c, changeFeedID) + defer s.teardownFeedInfo(c, changeFeedID) + + tests := []struct { + Desc string + Key []byte + Expected *Task + }{ + {"nil task key", nil, nil}, + {"short task key", []byte("test"), nil}, + {"normal task key", + []byte(kv.GetEtcdKeyTaskStatus(changeFeedID, s.w.capture.info.ID)), + &Task{changeFeedID, 1}}, + } + for _, t := range tests { + c.Log("testing ", t.Desc) + task, err := s.w.parseTask(ctx, t.Key, nil) + if t.Expected == nil { + c.Assert(err, check.NotNil) + c.Assert(task, check.IsNil) + } else { + c.Assert(task, check.DeepEquals, t.Expected) + } + } +} + +func (s *taskSuite) TestWatch(c *check.C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s.setupFeedInfo(c, "changefeed-1") + defer s.teardownFeedInfo(c, "changefeed-1") + + client := kv.NewCDCEtcdClient(s.c) + // Watch with a canceled context + failedCtx, cancel := context.WithCancel(context.Background()) + cancel() + ev := <-s.w.Watch(failedCtx) + if ev != nil { + c.Assert(ev.Err, check.NotNil) + } + + // Watch with a normal context + ch := s.w.Watch(context.Background()) + + // Put task changefeed-1 + c.Assert(client.PutTaskStatus(s.c.Ctx(), "changefeed-1", + s.w.capture.info.ID, + &model.TaskStatus{}), check.IsNil) + ev = <-ch + c.Assert(len(ch), check.Equals, 0) + c.Assert(ev, check.NotNil) + c.Assert(ev.Err, check.IsNil) + c.Assert(ev.Op, check.Equals, TaskOpCreate) + c.Assert(ev.Task.ChangeFeedID, check.Equals, "changefeed-1") + c.Assert(ev.Task.CheckpointTS, check.Equals, uint64(1)) + + // Stop the task changefeed-1 + c.Assert(client.PutTaskStatus(s.c.Ctx(), "changefeed-1", + s.w.capture.info.ID, + &model.TaskStatus{AdminJobType: model.AdminStop}), check.IsNil) + ev = <-ch + c.Assert(len(ch), check.Equals, 0) + c.Assert(ev, check.NotNil) + c.Assert(ev.Err, check.IsNil) + c.Assert(ev.Op, check.Equals, TaskOpDelete) + c.Assert(ev.Task.ChangeFeedID, check.Equals, "changefeed-1") + c.Assert(ev.Task.CheckpointTS, check.Equals, uint64(1)) + + // Resume the task changefeed-1 + c.Assert(client.PutTaskStatus(s.c.Ctx(), "changefeed-1", + s.w.capture.info.ID, + &model.TaskStatus{AdminJobType: model.AdminResume}), check.IsNil) + ev = <-ch + c.Assert(len(ch), check.Equals, 0) + c.Assert(ev, check.NotNil) + c.Assert(ev.Err, check.IsNil) + c.Assert(ev.Op, check.Equals, TaskOpCreate) + c.Assert(ev.Task.ChangeFeedID, check.Equals, "changefeed-1") + c.Assert(ev.Task.CheckpointTS, check.Equals, uint64(1)) + + // Delete the task changefeed-1 + c.Assert(client.DeleteTaskStatus(ctx, "changefeed-1", + s.w.capture.info.ID), check.IsNil) + ev = <-ch + c.Assert(len(ch), check.Equals, 0) + c.Assert(ev, check.NotNil) + c.Assert(ev.Err, check.IsNil) + c.Assert(ev.Op, check.Equals, TaskOpDelete) + c.Assert(ev.Task.ChangeFeedID, check.Equals, "changefeed-1") + c.Assert(ev.Task.CheckpointTS, check.Equals, uint64(1)) + + // Put task changefeed-2 which does not exist + c.Assert(client.PutTaskStatus(s.c.Ctx(), "changefeed-2", + s.w.capture.info.ID, + &model.TaskStatus{}), check.IsNil) + c.Assert(len(ch), check.Equals, 0) +} + +func (s *taskSuite) TestRebuildTaskEvents(c *check.C) { + type T map[string]*TaskEvent + tests := []struct { + desc string + outdated T + latest T + expected T + }{ + { + desc: "nil outdated", + outdated: nil, + latest: T{"changefeed-1": &TaskEvent{TaskOpCreate, &Task{"changeed-1", 0}, nil}}, + expected: T{"changefeed-1": &TaskEvent{TaskOpCreate, &Task{"changeed-1", 0}, nil}}, + }, + { + desc: "empty outdated", + outdated: nil, + latest: T{"changefeed-1": &TaskEvent{TaskOpCreate, &Task{"changeed-1", 0}, nil}}, + expected: T{"changefeed-1": &TaskEvent{TaskOpCreate, &Task{"changeed-1", 0}, nil}}, + }, + { + desc: "need to be updated", + outdated: T{"changefeed-1": &TaskEvent{TaskOpCreate, &Task{"changeed-1", 0}, nil}}, + latest: T{"changefeed-1": &TaskEvent{TaskOpDelete, &Task{"changeed-1", 0}, nil}}, + expected: T{"changefeed-1": &TaskEvent{TaskOpDelete, &Task{"changeed-1", 0}, nil}}, + }, + { + desc: "miss some events", + outdated: T{"changefeed-1": &TaskEvent{TaskOpCreate, &Task{"changeed-1", 0}, nil}}, + latest: T{ + "changefeed-1": &TaskEvent{TaskOpDelete, &Task{"changeed-1", 0}, nil}, + "changefeed-2": &TaskEvent{TaskOpCreate, &Task{"changefeed-2", 0}, nil}, + }, + expected: T{ + "changefeed-1": &TaskEvent{TaskOpDelete, &Task{"changeed-1", 0}, nil}, + "changefeed-2": &TaskEvent{TaskOpCreate, &Task{"changefeed-2", 0}, nil}, + }, + }, + { + desc: "left some events", + outdated: T{ + "changefeed-1": &TaskEvent{TaskOpDelete, &Task{"changeed-1", 0}, nil}, + "changefeed-2": &TaskEvent{TaskOpCreate, &Task{"changefeed-2", 0}, nil}, + }, + latest: T{"changefeed-1": &TaskEvent{TaskOpCreate, &Task{"changeed-1", 0}, nil}}, + expected: T{ + "changefeed-1": &TaskEvent{TaskOpCreate, &Task{"changeed-1", 0}, nil}, + "changefeed-2": &TaskEvent{TaskOpDelete, &Task{"changefeed-2", 0}, nil}, + }, + }, + } + + for _, t := range tests { + c.Log("RUN CASE: ", t.desc) + s.w.events = t.outdated + got := s.w.rebuildTaskEvents(t.latest) + c.Assert(len(got), check.Equals, len(t.expected)) + for k, v := range got { + e := t.expected[k] + c.Assert(v.Err, check.IsNil) + c.Assert(v.Op, check.Equals, e.Op) + c.Assert(v.Task, check.DeepEquals, e.Task) + } + } +} diff --git a/tests/_utils/ensure b/tests/_utils/ensure new file mode 100755 index 00000000000..b65fd6ba4f4 --- /dev/null +++ b/tests/_utils/ensure @@ -0,0 +1,20 @@ +#!/bin/bash +# $1: max retries +# $2: command to run + +total=$1 +shift +for ((i = 1 ; i <= $total ; i++)); do + # run the command + echo $* + bash -c "$*" + if [ $? == 0 ]; then + echo "run task successfully" + exit 0 + fi + echo "run task failed $i-th time, retry later" + sleep 2 +done + +echo "run task failed" +exit 1 diff --git a/tests/_utils/random_kill_process b/tests/_utils/random_kill_process new file mode 100755 index 00000000000..ef781c89110 --- /dev/null +++ b/tests/_utils/random_kill_process @@ -0,0 +1,32 @@ +#!/bin/bash +# parameter 1: process name + +process=$1 +retry_count=20 +pids=($(pidof $process)) +echo "list pids " ${pids[@]} + +if [ ${#pids[@]} == 0 ]; then + exit 0 +fi + +pid=${pids[$RANDOM % ${#pids[@]}]} + +echo "kill pid $pid" +kill $pid + +counter=0 +while [ $counter -lt $retry_count ]; do + ps -p $pid > /dev/null 2>&1 + ret=$? + if [ "$ret" != "0" ]; then + echo "process $pid already exit" + exit 0 + fi + ((counter+=1)) + sleep 0.5 + echo "wait process $pid exit for $counter-th time..." +done + +echo "wait process $pid exit timeout" +exit 1 diff --git a/tests/availability/run.sh b/tests/availability/run.sh new file mode 100644 index 00000000000..f9fc98598ab --- /dev/null +++ b/tests/availability/run.sh @@ -0,0 +1,90 @@ +#!/bin/bash + +set -e + +CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test + +export DOWN_TIDB_HOST +export DOWN_TIDB_PORT + +function prepare() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster $WORK_DIR + + cd $WORK_DIR + + # record tso before we create tables to skip the system table DDLs + start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST:$UP_PD_PORT) + + run_sql "CREATE table test.availability(id int primary key, val int);" + + cdc cli changefeed create --start-ts=$start_ts --sink-uri="mysql://root@127.0.0.1:3306/" +} + +function sql_check() { + # run check in sequence and short circuit principle, if error hanppens, + # the following statement will be not executed + + # check table availability. + echo "run sql_check", ${DOWN_TIDB_HOST} + run_sql "SELECT id, val FROM test.availability;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} && \ + check_contains "id: 1" && \ + check_contains "val: 1" && \ + check_contains "id: 2" && \ + check_contains "val: 22" && \ + check_not_contains "id: 3" +} +export -f sql_check + +function check_result() { + ensure 50 sql_check +} + +function nonempty(){ + sql=$* + run_sql "$sql" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} && \ + check_contains "id:" +} +export -f nonempty + +function availability_test() { + # start one server + run_cdc_server $WORK_DIR $CDC_BINARY + + # wait for the tables to appear + check_table_exists test.availability ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 20 + + run_sql "INSERT INTO test.availability(id, val) VALUES (1, 1);" + ensure 20 nonempty 'select id, val from test.availability where id=1 and val=1' + + # kill the server, then start another one + echo "restart the $CDC_BINARY" + cleanup_process $CDC_BINARY + run_cdc_server $WORK_DIR $CDC_BINARY + run_sql "INSERT INTO test.availability(id, val) VALUES (2, 2);" + ensure 20 nonempty 'select id, val from test.availability where id=2 and val=2' + + # run two other servers, three servers in total + run_cdc_server $WORK_DIR $CDC_BINARY + run_cdc_server $WORK_DIR $CDC_BINARY + # randomly kill a server, TODO: kill the owner instead + random_kill_process $CDC_BINARY + run_sql "INSERT INTO test.availability(id, val) VALUES (3, 3);" + ensure 20 nonempty 'select id, val from test.availability where id=3 and val=3' + + random_kill_process $CDC_BINARY + run_sql "UPDATE test.availability set val = 22 where id = 2;" + run_sql "DELETE from test.availability where id = 3;" + + check_result + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +prepare $* +availability_test $* +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"