Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeline: use Message value to reduce GC pressure #2415

Merged
merged 7 commits into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cdc/processor/pipeline/cyclic_mark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ func (s *markSuite) TestCyclicMarkNode(c *check.C) {
},
})
n := newCyclicMarkNode(markTableID)
err := n.Init(pipeline.MockNodeContext4Test(ctx, nil, nil))
err := n.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil))
c.Assert(err, check.IsNil)
outputCh := make(chan *pipeline.Message)
outputCh := make(chan pipeline.Message)
var wg sync.WaitGroup
wg.Add(2)
go func() {
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (n *mounterNode) Init(ctx pipeline.NodeContext) error {
}

for _, msg := range msgs {
msg := msg.(*pipeline.Message)
msg := msg.(pipeline.Message)
if msg.Tp != pipeline.MessageTypePolymorphicEvent {
// sends the control message directly to the next node
ctx.SendToNextNode(msg)
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/pipeline/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (s *mounterNodeSuite) TestMounterNodeBasics(c *check.C) {
p.AppendNode(ctx, "check", checkNode)

var sentCount int64
sendMsg := func(p *pipeline.Pipeline, msg *pipeline.Message) {
sendMsg := func(p *pipeline.Pipeline, msg pipeline.Message) {
err := retry.Do(context.Background(), func() error {
return p.SendToFirstNode(msg)
}, retry.WithBackoffBaseDelay(10), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(100))
Expand All @@ -123,7 +123,7 @@ func (s *mounterNodeSuite) TestMounterNodeBasics(c *check.C) {
go func() {
defer wg.Done()
for i := 0; i < basicsTestMessageCount; i++ {
var msg *pipeline.Message
var msg pipeline.Message
if i%100 == 0 {
// generates a control message
msg = pipeline.TickMessage()
Expand Down
12 changes: 6 additions & 6 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *outputSuite) TestStatus(c *check.C) {

// test stop at targetTs
node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, pipeline.BarrierMessage(20), nil)), check.IsNil)
Expand All @@ -145,7 +145,7 @@ func (s *outputSuite) TestStatus(c *check.C) {

// test the stop at ts command
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, pipeline.BarrierMessage(20), nil)), check.IsNil)
Expand All @@ -167,7 +167,7 @@ func (s *outputSuite) TestStatus(c *check.C) {

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, pipeline.BarrierMessage(20), nil)), check.IsNil)
Expand Down Expand Up @@ -200,7 +200,7 @@ func (s *outputSuite) TestManyTs(c *check.C) {
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
Expand Down Expand Up @@ -257,7 +257,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenEnableOldValue(c *check.C) {
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// nil row.
c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
Expand Down Expand Up @@ -316,7 +316,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) {
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// nil row.
c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
Expand Down
20 changes: 10 additions & 10 deletions pkg/pipeline/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,46 +20,46 @@ type NodeContext interface {
context.Context

// Message returns the message sent by the previous node
Message() *Message
Message() Message
// SendToNextNode sends the message to the next node
SendToNextNode(msg *Message)
SendToNextNode(msg Message)
}

type nodeContext struct {
context.Context
msg *Message
outputCh chan *Message
msg Message
outputCh chan Message
}

func newNodeContext(ctx context.Context, msg *Message, outputCh chan *Message) NodeContext {
func newNodeContext(ctx context.Context, msg Message, outputCh chan Message) NodeContext {
return &nodeContext{
Context: ctx,
msg: msg,
outputCh: outputCh,
}
}

func (ctx *nodeContext) Message() *Message {
func (ctx *nodeContext) Message() Message {
return ctx.msg
}

func (ctx *nodeContext) SendToNextNode(msg *Message) {
func (ctx *nodeContext) SendToNextNode(msg Message) {
// The header channel should never be blocked
ctx.outputCh <- msg
}

type messageContext struct {
NodeContext
message *Message
message Message
}

func withMessage(ctx NodeContext, msg *Message) NodeContext {
func withMessage(ctx NodeContext, msg Message) NodeContext {
return messageContext{
NodeContext: ctx,
message: msg,
}
}

func (ctx messageContext) Message() *Message {
func (ctx messageContext) Message() Message {
return ctx.message
}
24 changes: 10 additions & 14 deletions pkg/pipeline/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,39 +41,35 @@ type Message struct {
}

// PolymorphicEventMessage creates the message of PolymorphicEvent
func PolymorphicEventMessage(event *model.PolymorphicEvent) *Message {
return &Message{
func PolymorphicEventMessage(event *model.PolymorphicEvent) Message {
return Message{
Tp: MessageTypePolymorphicEvent,
PolymorphicEvent: event,
}
}

// CommandMessage creates the message of Command
func CommandMessage(command *Command) *Message {
return &Message{
func CommandMessage(command *Command) Message {
return Message{
Tp: MessageTypeCommand,
Command: command,
}
}

// BarrierMessage creates the message of Command
func BarrierMessage(barrierTs model.Ts) *Message {
return &Message{
func BarrierMessage(barrierTs model.Ts) Message {
return Message{
Tp: MessageTypeBarrier,
BarrierTs: barrierTs,
}
}

// TickMessage is called frequently,
// to ease GC pressure we return a global variable.
var tickMsg *Message = &Message{
Tp: MessageTypeTick,
}

// TickMessage creates the message of Tick
// Note: the returned message is READ-ONLY.
func TickMessage() *Message {
return tickMsg
func TickMessage() Message {
return Message{
Tp: MessageTypeTick,
}
}

// CommandType is the type of Command
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (p *Pipeline) driveRunner(ctx context.Context, previousRunner, runner runne
var pipelineTryAgainError error = cerror.ErrPipelineTryAgain.FastGenByArgs()

// SendToFirstNode sends the message to the first node
func (p *Pipeline) SendToFirstNode(msg *Message) error {
func (p *Pipeline) SendToFirstNode(msg Message) error {
p.closeMu.Lock()
defer p.closeMu.Unlock()
if p.isClosed {
Expand Down
73 changes: 69 additions & 4 deletions pkg/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package pipeline

import (
stdCtx "context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -76,7 +77,7 @@ func (e echoNode) Destroy(ctx NodeContext) error {

type checkNode struct {
c *check.C
expected []*Message
expected []Message
index int
}

Expand Down Expand Up @@ -112,7 +113,7 @@ func (s *pipelineSuite) TestPipelineUsage(c *check.C) {
p.AppendNode(ctx, "echo node", echoNode{})
p.AppendNode(ctx, "check node", &checkNode{
c: c,
expected: []*Message{
expected: []Message{
PolymorphicEventMessage(&model.PolymorphicEvent{
Row: &model.RowChangedEvent{
Table: &model.TableName{
Expand Down Expand Up @@ -225,7 +226,7 @@ func (s *pipelineSuite) TestPipelineError(c *check.C) {
p.AppendNode(ctx, "error node", &errorNode{c: c})
p.AppendNode(ctx, "check node", &checkNode{
c: c,
expected: []*Message{
expected: []Message{
PolymorphicEventMessage(&model.PolymorphicEvent{
Row: &model.RowChangedEvent{
Table: &model.TableName{
Expand Down Expand Up @@ -380,7 +381,7 @@ func (s *pipelineSuite) TestPipelineAppendNode(c *check.C) {

p.AppendNode(ctx, "check node", &checkNode{
c: c,
expected: []*Message{
expected: []Message{
PolymorphicEventMessage(&model.PolymorphicEvent{
Row: &model.RowChangedEvent{
Table: &model.TableName{
Expand Down Expand Up @@ -475,3 +476,67 @@ func (s *pipelineSuite) TestPipelinePanic(c *check.C) {
p.AppendNode(ctx, "panic", panicNode{})
p.Wait()
}

type forward struct {
ch chan Message
}

func (n *forward) Init(ctx NodeContext) error {
return nil
}

func (n *forward) Receive(ctx NodeContext) error {
m := ctx.Message()
if n.ch != nil {
n.ch <- m
} else {
ctx.SendToNextNode(m)
}
return nil
}

func (n *forward) Destroy(ctx NodeContext) error {
return nil
}

// Run the benchmark
// go test -benchmem -run=^$ -bench ^(BenchmarkPipeline)$ github.com/pingcap/ticdc/pkg/pipeline
func BenchmarkPipeline(b *testing.B) {
ctx := context.NewContext(stdCtx.Background(), &context.GlobalVars{})
runnersSize, outputChannelSize := 2, 64

b.Run("BenchmarkPipeline", func(b *testing.B) {
for i := 1; i <= 8; i++ {
ctx, cancel := context.WithCancel(ctx)
ctx = context.WithErrorHandler(ctx, func(err error) error {
b.Fatal(err)
return err
})

ch := make(chan Message)
p := NewPipeline(ctx, -1, runnersSize, outputChannelSize)
for j := 0; j < i; j++ {
if (j + 1) == i {
// The last node
p.AppendNode(ctx, "forward node", &forward{ch: ch})
} else {
p.AppendNode(ctx, "forward node", &forward{})
}
}

b.ResetTimer()
b.Run(fmt.Sprintf("%d node(s)", i), func(b *testing.B) {
for i := 0; i < b.N; i++ {
err := p.SendToFirstNode(BarrierMessage(1))
if err != nil {
b.Fatal(err)
}
<-ch
}
})
b.StopTimer()
cancel()
p.Wait()
}
})
}
14 changes: 7 additions & 7 deletions pkg/pipeline/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

type runner interface {
run(ctx context.Context) error
getOutputCh() chan *Message
getOutputCh() chan Message
getNode() Node
getName() string
}
Expand All @@ -30,20 +30,20 @@ type nodeRunner struct {
name string
node Node
previous runner
outputCh chan *Message
outputCh chan Message
}

func newNodeRunner(name string, node Node, previous runner, outputChanSize int) *nodeRunner {
return &nodeRunner{
name: name,
node: node,
previous: previous,
outputCh: make(chan *Message, outputChanSize),
outputCh: make(chan Message, outputChanSize),
}
}

func (r *nodeRunner) run(ctx context.Context) error {
nodeCtx := newNodeContext(ctx, nil, r.outputCh)
nodeCtx := newNodeContext(ctx, Message{}, r.outputCh)
defer close(r.outputCh)
defer func() {
err := r.node.Destroy(nodeCtx)
Expand All @@ -65,7 +65,7 @@ func (r *nodeRunner) run(ctx context.Context) error {
return nil
}

func (r *nodeRunner) getOutputCh() chan *Message {
func (r *nodeRunner) getOutputCh() chan Message {
return r.outputCh
}

Expand All @@ -77,7 +77,7 @@ func (r *nodeRunner) getName() string {
return r.name
}

type headRunner chan *Message
type headRunner chan Message

func (h headRunner) getName() string {
return "header"
Expand All @@ -87,7 +87,7 @@ func (h headRunner) run(ctx context.Context) error {
panic("unreachable")
}

func (h headRunner) getOutputCh() chan *Message {
func (h headRunner) getOutputCh() chan Message {
return h
}

Expand Down
Loading