Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#2415
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
overvenus authored and ti-chi-bot committed Aug 2, 2021
1 parent c426e49 commit aaad9ca
Show file tree
Hide file tree
Showing 10 changed files with 327 additions and 48 deletions.
4 changes: 2 additions & 2 deletions cdc/processor/pipeline/cyclic_mark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ func (s *markSuite) TestCyclicMarkNode(c *check.C) {
},
})
n := newCyclicMarkNode(markTableID)
err := n.Init(pipeline.MockNodeContext4Test(ctx, nil, nil))
err := n.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil))
c.Assert(err, check.IsNil)
outputCh := make(chan *pipeline.Message)
outputCh := make(chan pipeline.Message)
var wg sync.WaitGroup
wg.Add(2)
go func() {
Expand Down
54 changes: 54 additions & 0 deletions cdc/processor/pipeline/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,61 @@ func newMounterNode(mounter entry.Mounter) pipeline.Node {
}

func (n *mounterNode) Init(ctx pipeline.NodeContext) error {
<<<<<<< HEAD
// do nothing
=======
stdCtx, cancel := context.WithCancel(ctx)
n.cancel = cancel

receiver, err := n.notifier.NewReceiver(time.Millisecond * 100)
if err != nil {
log.Panic("unexpected error", zap.Error(err))
}

n.wg.Go(func() error {
defer receiver.Stop()
for {
select {
case <-stdCtx.Done():
return nil
case <-receiver.C:
// handles writes to the queue
for {
n.mu.Lock()
msgs := n.queue.PopManyFront(waitEventMountedBatchSize)
n.mu.Unlock()
if len(msgs) == 0 {
break // inner loop
}

for _, msg := range msgs {
msg := msg.(pipeline.Message)
if msg.Tp != pipeline.MessageTypePolymorphicEvent {
// sends the control message directly to the next node
ctx.SendToNextNode(msg)
continue // to handling the next message
}

// handles PolymorphicEvents
event := msg.PolymorphicEvent
if event.RawKV.OpType != model.OpTypeResolved {
failpoint.Inject("MounterNodeWaitPrepare", func() {})
// only RowChangedEvents need mounting
err := event.WaitPrepare(stdCtx)
if err != nil {
ctx.Throw(err)
return nil
}
}

ctx.SendToNextNode(msg)
}
}
}
}
})

>>>>>>> c60bdb68 (pipeline: use Message value to reduce GC pressure (#2415))
return nil
}

Expand Down
164 changes: 164 additions & 0 deletions cdc/processor/pipeline/mounter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package pipeline

import (
"context"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/check"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
cdcContext "github.com/pingcap/ticdc/pkg/context"
"github.com/pingcap/ticdc/pkg/pipeline"
"github.com/pingcap/ticdc/pkg/retry"
"github.com/pingcap/ticdc/pkg/util/testleak"
"go.uber.org/zap"
)

type mounterNodeSuite struct{}

var _ = check.Suite(&mounterNodeSuite{})

type checkNode struct {
c *check.C
count int
expected int
}

func (n *checkNode) Init(ctx pipeline.NodeContext) error {
// do nothing
return nil
}

func (n *checkNode) Receive(ctx pipeline.NodeContext) error {
message := ctx.Message()
if message.Tp == pipeline.MessageTypePolymorphicEvent {
if message.PolymorphicEvent.RawKV.OpType == model.OpTypeResolved {
n.c.Assert(n.count, check.Equals, n.expected)
return errors.New("finished")
}
n.c.Assert(message.PolymorphicEvent.Row, check.NotNil)
}

if n.count%100 == 0 {
log.Info("message received", zap.Int("count", n.count))
}

if n.count == basicsTestMessageCount/2 {
log.Info("sleeping for 5 seconds to simulate blocking")
time.Sleep(time.Second * 5)
}
n.count++
return nil
}

func (n *checkNode) Destroy(ctx pipeline.NodeContext) error {
return nil
}

const (
basicsTestMessageCount = 10000
)

func generateMockRawKV(ts uint64) *model.RawKVEntry {
return &model.RawKVEntry{
OpType: model.OpTypePut,
Key: []byte{},
Value: []byte{},
OldValue: nil,
StartTs: ts - 5,
CRTs: ts,
RegionID: 0,
}
}

func (s *mounterNodeSuite) TestMounterNodeBasics(c *check.C) {
defer testleak.AfterTest(c)()

ctx, cancel := cdcContext.WithCancel(cdcContext.NewBackendContext4Test(false))
defer cancel()

ctx = cdcContext.WithErrorHandler(ctx, func(err error) error {
return nil
})
runnersSize, outputChannelSize := 2, 64
p := pipeline.NewPipeline(ctx, 0, runnersSize, outputChannelSize)
mounterNode := newMounterNode()
p.AppendNode(ctx, "mounter", mounterNode)

checkNode := &checkNode{
c: c,
count: 0,
expected: basicsTestMessageCount,
}
p.AppendNode(ctx, "check", checkNode)

var sentCount int64
sendMsg := func(p *pipeline.Pipeline, msg pipeline.Message) {
err := retry.Do(context.Background(), func() error {
return p.SendToFirstNode(msg)
}, retry.WithBackoffBaseDelay(10), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(100))
atomic.AddInt64(&sentCount, 1)
c.Assert(err, check.IsNil)
}

mockMounterInput := make(chan *model.PolymorphicEvent, 10240)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < basicsTestMessageCount; i++ {
var msg pipeline.Message
if i%100 == 0 {
// generates a control message
msg = pipeline.TickMessage()
} else {
msg = pipeline.PolymorphicEventMessage(model.NewPolymorphicEvent(generateMockRawKV(uint64(i << 5))))
msg.PolymorphicEvent.SetUpFinishedChan()
select {
case <-ctx.Done():
return
case mockMounterInput <- msg.PolymorphicEvent:
}
}
sendMsg(p, msg)
}
msg := pipeline.PolymorphicEventMessage(model.NewResolvedPolymorphicEvent(0, (basicsTestMessageCount<<5)+1))
sendMsg(p, msg)
c.Assert(atomic.LoadInt64(&sentCount), check.Equals, int64(basicsTestMessageCount+1))
log.Info("finished sending")
}()

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case event := <-mockMounterInput:
event.Row = &model.RowChangedEvent{} // mocked row
event.PrepareFinished()
}
}
}()

p.Wait()
cancel()
wg.Wait()
}
12 changes: 6 additions & 6 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *outputSuite) TestStatus(c *check.C) {

// test stop at targetTs
node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, pipeline.BarrierMessage(20), nil)), check.IsNil)
Expand All @@ -145,7 +145,7 @@ func (s *outputSuite) TestStatus(c *check.C) {

// test the stop at ts command
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, pipeline.BarrierMessage(20), nil)), check.IsNil)
Expand All @@ -167,7 +167,7 @@ func (s *outputSuite) TestStatus(c *check.C) {

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, pipeline.BarrierMessage(20), nil)), check.IsNil)
Expand Down Expand Up @@ -200,7 +200,7 @@ func (s *outputSuite) TestManyTs(c *check.C) {
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
Expand Down Expand Up @@ -259,7 +259,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenEnableOldValue(c *check.C) {
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// nil row.
c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
Expand Down Expand Up @@ -318,7 +318,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) {
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// nil row.
c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
Expand Down
20 changes: 10 additions & 10 deletions pkg/pipeline/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,46 +20,46 @@ type NodeContext interface {
context.Context

// Message returns the message sent by the previous node
Message() *Message
Message() Message
// SendToNextNode sends the message to the next node
SendToNextNode(msg *Message)
SendToNextNode(msg Message)
}

type nodeContext struct {
context.Context
msg *Message
outputCh chan *Message
msg Message
outputCh chan Message
}

func newNodeContext(ctx context.Context, msg *Message, outputCh chan *Message) NodeContext {
func newNodeContext(ctx context.Context, msg Message, outputCh chan Message) NodeContext {
return &nodeContext{
Context: ctx,
msg: msg,
outputCh: outputCh,
}
}

func (ctx *nodeContext) Message() *Message {
func (ctx *nodeContext) Message() Message {
return ctx.msg
}

func (ctx *nodeContext) SendToNextNode(msg *Message) {
func (ctx *nodeContext) SendToNextNode(msg Message) {
// The header channel should never be blocked
ctx.outputCh <- msg
}

type messageContext struct {
NodeContext
message *Message
message Message
}

func withMessage(ctx NodeContext, msg *Message) NodeContext {
func withMessage(ctx NodeContext, msg Message) NodeContext {
return messageContext{
NodeContext: ctx,
message: msg,
}
}

func (ctx messageContext) Message() *Message {
func (ctx messageContext) Message() Message {
return ctx.message
}
24 changes: 10 additions & 14 deletions pkg/pipeline/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,39 +41,35 @@ type Message struct {
}

// PolymorphicEventMessage creates the message of PolymorphicEvent
func PolymorphicEventMessage(event *model.PolymorphicEvent) *Message {
return &Message{
func PolymorphicEventMessage(event *model.PolymorphicEvent) Message {
return Message{
Tp: MessageTypePolymorphicEvent,
PolymorphicEvent: event,
}
}

// CommandMessage creates the message of Command
func CommandMessage(command *Command) *Message {
return &Message{
func CommandMessage(command *Command) Message {
return Message{
Tp: MessageTypeCommand,
Command: command,
}
}

// BarrierMessage creates the message of Command
func BarrierMessage(barrierTs model.Ts) *Message {
return &Message{
func BarrierMessage(barrierTs model.Ts) Message {
return Message{
Tp: MessageTypeBarrier,
BarrierTs: barrierTs,
}
}

// TickMessage is called frequently,
// to ease GC pressure we return a global variable.
var tickMsg *Message = &Message{
Tp: MessageTypeTick,
}

// TickMessage creates the message of Tick
// Note: the returned message is READ-ONLY.
func TickMessage() *Message {
return tickMsg
func TickMessage() Message {
return Message{
Tp: MessageTypeTick,
}
}

// CommandType is the type of Command
Expand Down
Loading

0 comments on commit aaad9ca

Please sign in to comment.