Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeed: Fix cannot remove changefeed when err gcttl exceeded (#2429) #2454

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type changefeed struct {
scheduler *scheduler
barriers *barriers
feedStateManager *feedStateManager
gcManager *gcManager
gcManager GcManager

schema *schemaWrap4Owner
sink AsyncSink
Expand Down Expand Up @@ -68,7 +68,7 @@ type changefeed struct {
newSink func(ctx cdcContext.Context) (AsyncSink, error)
}

func newChangefeed(id model.ChangeFeedID, gcManager *gcManager) *changefeed {
func newChangefeed(id model.ChangeFeedID, gcManager GcManager) *changefeed {
c := &changefeed{
id: id,
scheduler: newScheduler(),
Expand All @@ -86,7 +86,7 @@ func newChangefeed(id model.ChangeFeedID, gcManager *gcManager) *changefeed {
}

func newChangefeed4Test(
id model.ChangeFeedID, gcManager *gcManager,
id model.ChangeFeedID, gcManager GcManager,
newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error),
newSink func(ctx cdcContext.Context) (AsyncSink, error),
) *changefeed {
Expand Down Expand Up @@ -119,20 +119,29 @@ func (c *changefeed) Tick(ctx cdcContext.Context, state *model.ChangefeedReactor
}
}

func (c *changefeed) tick(ctx cdcContext.Context, state *model.ChangefeedReactorState, captures map[model.CaptureID]*model.CaptureInfo) error {
c.state = state
c.feedStateManager.Tick(state)
checkpointTs := c.state.Info.GetCheckpointTs(c.state.Status)
switch c.state.Info.State {
case model.StateNormal, model.StateStopped, model.StateError:
if err := c.gcManager.CheckStaleCheckpointTs(ctx, checkpointTs); err != nil {
func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs uint64) error {
state := c.state.Info.State
if state == model.StateNormal || state == model.StateStopped || state == model.StateError {
if err := c.gcManager.checkStaleCheckpointTs(ctx, checkpointTs); err != nil {
return errors.Trace(err)
}
}
return nil
}

func (c *changefeed) tick(ctx cdcContext.Context, state *model.ChangefeedReactorState, captures map[model.CaptureID]*model.CaptureInfo) error {
c.state = state
c.feedStateManager.Tick(state)
if !c.feedStateManager.ShouldRunning() {
c.releaseResources()
return nil
}

checkpointTs := c.state.Info.GetCheckpointTs(c.state.Status)
if err := c.checkStaleCheckpointTs(ctx, checkpointTs); err != nil {
return errors.Trace(err)
}

if !c.preflightCheck(captures) {
return nil
}
Expand Down
9 changes: 8 additions & 1 deletion cdc/owner/gc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ const (
// gcSafepointUpdateInterval is the minimum interval that CDC can update gc safepoint
var gcSafepointUpdateInterval = 1 * time.Minute

// GcManager is an interface for gc manager
type GcManager interface {
updateGCSafePoint(ctx cdcContext.Context, state *model.GlobalReactorState) error
currentTimeFromPDCached(ctx cdcContext.Context) (time.Time, error)
checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs model.Ts) error
}

type gcManager struct {
gcTTL int64

Expand Down Expand Up @@ -121,7 +128,7 @@ func (m *gcManager) currentTimeFromPDCached(ctx cdcContext.Context) (time.Time,
return m.pdPhysicalTimeCache, nil
}

func (m *gcManager) CheckStaleCheckpointTs(ctx cdcContext.Context, checkpointTs model.Ts) error {
func (m *gcManager) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs model.Ts) error {
if m.isTiCDCBlockGC {
pdTime, err := m.currentTimeFromPDCached(ctx)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions cdc/owner/gc_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,14 @@ func (s *gcManagerSuite) TestCheckStaleCheckpointTs(c *check.C) {
ctx := cdcContext.NewBackendContext4Test(true)
mockPDClient := &mockPDClient{}
ctx.GlobalVars().PDClient = mockPDClient
err := gcManager.CheckStaleCheckpointTs(ctx, 10)
err := gcManager.checkStaleCheckpointTs(ctx, 10)
c.Assert(cerror.ErrGCTTLExceeded.Equal(errors.Cause(err)), check.IsTrue)

err = gcManager.CheckStaleCheckpointTs(ctx, oracle.GoTimeToTS(time.Now()))
err = gcManager.checkStaleCheckpointTs(ctx, oracle.GoTimeToTS(time.Now()))
c.Assert(err, check.IsNil)

gcManager.isTiCDCBlockGC = false
gcManager.lastSafePointTs = 20
err = gcManager.CheckStaleCheckpointTs(ctx, 10)
err = gcManager.checkStaleCheckpointTs(ctx, 10)
c.Assert(cerror.ErrSnapshotLostByGC.Equal(errors.Cause(err)), check.IsTrue)
}
8 changes: 4 additions & 4 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (

type ownerJobType int

// All AdminJob types
// All OwnerJob types
const (
ownerJobTypeRebalance ownerJobType = iota
ownerJobTypeManualSchedule
Expand Down Expand Up @@ -65,7 +65,7 @@ type ownerJob struct {
type Owner struct {
changefeeds map[model.ChangeFeedID]*changefeed

gcManager *gcManager
gcManager GcManager

ownerJobQueueMu sync.Mutex
ownerJobQueue []*ownerJob
Expand All @@ -74,7 +74,7 @@ type Owner struct {

closed int32

newChangefeed func(id model.ChangeFeedID, gcManager *gcManager) *changefeed
newChangefeed func(id model.ChangeFeedID, gcManager GcManager) *changefeed
}

// NewOwner creates a new Owner
Expand All @@ -92,7 +92,7 @@ func NewOwner4Test(
newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error),
newSink func(ctx cdcContext.Context) (AsyncSink, error)) *Owner {
o := NewOwner()
o.newChangefeed = func(id model.ChangeFeedID, gcManager *gcManager) *changefeed {
o.newChangefeed = func(id model.ChangeFeedID, gcManager GcManager) *changefeed {
return newChangefeed4Test(id, gcManager, newDDLPuller, newSink)
}
return o
Expand Down
39 changes: 39 additions & 0 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/util/testleak"
Expand All @@ -33,6 +34,16 @@ var _ = check.Suite(&ownerSuite{})
type ownerSuite struct {
}

type mockGcManager struct {
GcManager
}

func (m *mockGcManager) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs model.Ts) error {
return cerror.ErrGCTTLExceeded.GenWithStackByArgs()
}

var _ GcManager = &mockGcManager{}

func createOwner4Test(ctx cdcContext.Context, c *check.C) (*Owner, *model.GlobalReactorState, *orchestrator.ReactorStateTester) {
ctx.GlobalVars().PDClient = &mockPDClient{updateServiceGCSafePointFunc: func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
return safePoint, nil
Expand Down Expand Up @@ -90,6 +101,34 @@ func (s *ownerSuite) TestCreateRemoveChangefeed(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(owner.changefeeds, check.Not(check.HasKey), changefeedID)
c.Assert(state.Changefeeds, check.Not(check.HasKey), changefeedID)

tester.MustUpdate(cdcKey.String(), []byte(changefeedStr))
_, err = owner.Tick(ctx, state)
tester.MustApplyPatches()
c.Assert(err, check.IsNil)
c.Assert(owner.changefeeds, check.HasKey, changefeedID)

removeJob := model.AdminJob{
CfID: changefeedID,
Type: model.AdminRemove,
Opts: &model.AdminJobOption{ForceRemove: true},
Error: nil,
}

// this will make changefeed always meet ErrGCTTLExceeded
mockedGcManager := &mockGcManager{GcManager: owner.gcManager}
owner.gcManager = mockedGcManager

// this tick create remove changefeed patches
owner.EnqueueJob(removeJob)
_, err = owner.Tick(ctx, state)
c.Assert(err, check.IsNil)

// apply patches and update owner's in memory changefeed states
tester.MustApplyPatches()
_, err = owner.Tick(ctx, state)
c.Assert(err, check.IsNil)
c.Assert(owner.changefeeds, check.Not(check.HasKey), changefeedID)
}

func (s *ownerSuite) TestStopChangefeed(c *check.C) {
Expand Down
12 changes: 6 additions & 6 deletions cmd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var (

var errOwnerNotFound = liberrors.New("owner not found")

var tsGapWarnning int64 = 86400 * 1000 // 1 day in milliseconds
var tsGapWarning int64 = 86400 * 1000 // 1 day in milliseconds

// Endpoint schemes.
const (
Expand Down Expand Up @@ -164,11 +164,11 @@ func applyAdminChangefeed(ctx context.Context, job model.AdminJob, credential *s
if job.Opts != nil && job.Opts.ForceRemove {
forceRemoveOpt = "true"
}
resp, err := cli.PostForm(addr, url.Values(map[string][]string{
resp, err := cli.PostForm(addr, map[string][]string{
cdc.APIOpVarAdminJob: {fmt.Sprint(int(job.Type))},
cdc.APIOpVarChangefeedID: {job.CfID},
cdc.APIOpForceRemoveChangefeed: {forceRemoveOpt},
}))
})
if err != nil {
return err
}
Expand Down Expand Up @@ -198,9 +198,9 @@ func applyOwnerChangefeedQuery(
if err != nil {
return "", err
}
resp, err := cli.PostForm(addr, url.Values(map[string][]string{
resp, err := cli.PostForm(addr, map[string][]string{
cdc.APIOpVarChangefeedID: {cid},
}))
})
if err != nil {
return "", err
}
Expand Down Expand Up @@ -365,7 +365,7 @@ func confirmLargeDataGap(ctx context.Context, cmd *cobra.Command, startTs uint64
return err
}
tsGap := currentPhysical - oracle.ExtractPhysical(startTs)
if tsGap > tsGapWarnning {
if tsGap > tsGapWarning {
cmd.Printf("Replicate lag (%s) is larger than 1 days, "+
"large data may cause OOM, confirm to continue at your own risk [Y/N]\n",
time.Duration(tsGap)*time.Millisecond,
Expand Down