Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: use values to reduce GC pressure (#2474) #2497

Merged
39 changes: 21 additions & 18 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,13 @@ type regionEvent struct {
resolvedTs *cdcpb.ResolvedTs
}

// A special event that indicates singleEventFeed is closed.
var emptyRegionEvent = regionEvent{}

type regionFeedState struct {
sri singleRegionInfo
requestID uint64
regionEventCh chan *regionEvent
regionEventCh chan regionEvent
stopped int32

lock sync.RWMutex
Expand All @@ -164,7 +167,7 @@ func newRegionFeedState(sri singleRegionInfo, requestID uint64) *regionFeedState
return &regionFeedState{
sri: sri,
requestID: requestID,
regionEventCh: make(chan *regionEvent, 16),
regionEventCh: make(chan regionEvent, 16),
stopped: 0,
}
}
Expand Down Expand Up @@ -352,7 +355,7 @@ type CDCKVClient interface {
enableOldValue bool,
lockResolver txnutil.LockResolver,
isPullerInit PullerInitialization,
eventCh chan<- *model.RegionFeedEvent,
eventCh chan<- model.RegionFeedEvent,
) error
Close() error
}
Expand Down Expand Up @@ -492,7 +495,7 @@ func (c *CDCClient) EventFeed(
enableOldValue bool,
lockResolver txnutil.LockResolver,
isPullerInit PullerInitialization,
eventCh chan<- *model.RegionFeedEvent,
eventCh chan<- model.RegionFeedEvent,
) error {
s := newEventFeedSession(ctx, c, c.regionCache, c.kvStorage, span,
lockResolver, isPullerInit,
Expand Down Expand Up @@ -523,7 +526,7 @@ type eventFeedSession struct {
totalSpan regionspan.ComparableSpan

// The channel to send the processed events.
eventCh chan<- *model.RegionFeedEvent
eventCh chan<- model.RegionFeedEvent
// The token based region router, it controls the uninitialized regions with
// a given size limit.
regionRouter LimitRegionRouter
Expand Down Expand Up @@ -565,7 +568,7 @@ func newEventFeedSession(
isPullerInit PullerInitialization,
enableOldValue bool,
startTs uint64,
eventCh chan<- *model.RegionFeedEvent,
eventCh chan<- model.RegionFeedEvent,
) *eventFeedSession {
id := strconv.FormatUint(allocID(), 10)
kvClientCfg := config.GetGlobalServerConfig().KVClient
Expand Down Expand Up @@ -950,7 +953,7 @@ func (s *eventFeedSession) dispatchRequest(
// distribution in puller, so this resolved ts event is needed.
// After this resolved ts event is sent, we don't need to send one more
// resolved ts event when the region starts to work.
resolvedEv := &model.RegionFeedEvent{
resolvedEv := model.RegionFeedEvent{
RegionID: sri.verID.GetID(),
Resolved: &model.ResolvedSpan{
Span: sri.span,
Expand Down Expand Up @@ -1291,7 +1294,7 @@ func (s *eventFeedSession) receiveFromStream(

for _, state := range regionStates {
select {
case state.regionEventCh <- nil:
case state.regionEventCh <- emptyRegionEvent:
case <-ctx.Done():
return ctx.Err()
}
Expand Down Expand Up @@ -1390,7 +1393,7 @@ func (s *eventFeedSession) sendRegionChangeEvent(
}

select {
case state.regionEventCh <- &regionEvent{
case state.regionEventCh <- regionEvent{
changeEvent: event,
}:
case <-ctx.Done():
Expand All @@ -1416,7 +1419,7 @@ func (s *eventFeedSession) sendResolvedTs(
continue
}
select {
case state.regionEventCh <- &regionEvent{
case state.regionEventCh <- regionEvent{
resolvedTs: resolvedTs,
}:
case <-ctx.Done():
Expand All @@ -1438,7 +1441,7 @@ func (s *eventFeedSession) singleEventFeed(
span regionspan.ComparableSpan,
startTs uint64,
storeAddr string,
receiverCh <-chan *regionEvent,
receiverCh <-chan regionEvent,
) (lastResolvedTs uint64, initialized bool, err error) {
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
Expand Down Expand Up @@ -1471,7 +1474,7 @@ func (s *eventFeedSession) singleEventFeed(
return nil
}
// emit a checkpointTs
revent := &model.RegionFeedEvent{
revent := model.RegionFeedEvent{
RegionID: regionID,
Resolved: &model.ResolvedSpan{
Span: span,
Expand All @@ -1495,7 +1498,7 @@ func (s *eventFeedSession) singleEventFeed(
})

for {
var event *regionEvent
var event regionEvent
var ok bool
select {
case <-ctx.Done():
Expand Down Expand Up @@ -1543,12 +1546,12 @@ func (s *eventFeedSession) singleEventFeed(
case event, ok = <-receiverCh:
}

if !ok || event == nil {
if !ok || event == emptyRegionEvent {
log.Debug("singleEventFeed closed by error")
err = cerror.ErrEventFeedAborted.GenWithStackByArgs()
return
}
var revent *model.RegionFeedEvent
var revent model.RegionFeedEvent
lastReceivedEventTime = time.Now()
if event.changeEvent != nil {
metricEventSize.Observe(float64(event.changeEvent.Event.Size()))
Expand Down Expand Up @@ -1703,18 +1706,18 @@ func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.Can
return
}

func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row, enableOldValue bool) (*model.RegionFeedEvent, error) {
func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row, enableOldValue bool) (model.RegionFeedEvent, error) {
var opType model.OpType
switch entry.GetOpType() {
case cdcpb.Event_Row_DELETE:
opType = model.OpTypeDelete
case cdcpb.Event_Row_PUT:
opType = model.OpTypePut
default:
return nil, cerror.ErrUnknownKVEventType.GenWithStackByArgs(entry.GetOpType(), entry)
return model.RegionFeedEvent{}, cerror.ErrUnknownKVEventType.GenWithStackByArgs(entry.GetOpType(), entry)
}

revent := &model.RegionFeedEvent{
revent := model.RegionFeedEvent{
RegionID: regionID,
Val: &model.RawKVEntry{
OpType: opType,
Expand Down
8 changes: 4 additions & 4 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
*sync.Map, /* regionID -> requestID/storeID */
*sync.WaitGroup, /* ensure eventfeed routine exit */
context.CancelFunc, /* cancle both mock server and cdc kv client */
chan *model.RegionFeedEvent, /* kv client output channel */
chan model.RegionFeedEvent, /* kv client output channel */
[]chan *cdcpb.ChangeDataEvent, /* mock server data channels */
) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -189,7 +189,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
lockresolver := txnutil.NewLockerResolver(kvStorage)
isPullInit := &mockPullerInit{}
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 1000000)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
Expand Down Expand Up @@ -223,7 +223,7 @@ func prepareBench(b *testing.B, regionNum int) (
*sync.Map, /* regionID -> requestID */
*sync.WaitGroup, /* ensure eventfeed routine exit */
context.CancelFunc, /* cancle both mock server and cdc kv client */
chan *model.RegionFeedEvent, /* kv client output channel */
chan model.RegionFeedEvent, /* kv client output channel */
chan *cdcpb.ChangeDataEvent, /* mock server data channel */
) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -277,7 +277,7 @@ func prepareBench(b *testing.B, regionNum int) (
lockresolver := txnutil.NewLockerResolver(kvStorage)
isPullInit := &mockPullerInit{}
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 1000000)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, false, lockresolver, isPullInit, eventCh)
Expand Down
Loading