Skip to content

Commit

Permalink
fix: [2.4] long buffering causes mq to be unable to receive messages.
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Sep 23, 2024
1 parent 7ac3ef9 commit 8341e73
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 20 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ mq:
enablePursuitMode: true # Default value: "true"
pursuitLag: 10 # time tick lag threshold to enter pursuit mode, in seconds
pursuitBufferSize: 8388608 # pursuit mode buffer size in bytes
pursuitBufferTime: 60 # pursuit mode buffer time in seconds
mqBufSize: 16 # MQ client consumer buffer length
dispatcher:
mergeCheckInterval: 1 # the interval time(in seconds) for dispatcher to check whether to merge
Expand Down
1 change: 1 addition & 0 deletions internal/datacoord/import_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/importutilv2"
Expand Down
9 changes: 7 additions & 2 deletions pkg/mq/msgstream/mq_msgstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ func isDMLMsg(msg TsMsg) bool {
return msg.Type() == commonpb.MsgType_Insert || msg.Type() == commonpb.MsgType_Delete
}

func (ms *MqTtMsgStream) continueBuffering(endTs uint64, size uint64) bool {
func (ms *MqTtMsgStream) continueBuffering(endTs, size uint64, startTime time.Time) bool {
if ms.ctx.Err() != nil {
return false
}
Expand All @@ -637,6 +637,10 @@ func (ms *MqTtMsgStream) continueBuffering(endTs uint64, size uint64) bool {
return false
}

if time.Since(startTime) > paramtable.Get().ServiceParam.MQCfg.PursuitBufferTime.GetAsDuration(time.Second) {
return false
}

endTime, _ := tsoutil.ParseTS(endTs)
return time.Since(endTime) > paramtable.Get().ServiceParam.MQCfg.PursuitLag.GetAsDuration(time.Second)
}
Expand Down Expand Up @@ -665,10 +669,11 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() {
// endMsgPositions := make([]*msgpb.MsgPosition, 0)
startPositions := make(map[string]*msgpb.MsgPosition)
endPositions := make(map[string]*msgpb.MsgPosition)
startBufTime := time.Now()
var endTs uint64
var size uint64

for ms.continueBuffering(endTs, size) {
for ms.continueBuffering(endTs, size, startBufTime) {
ms.consumerLock.Lock()
// wait all channels get ttMsg
for _, consumer := range ms.consumers {
Expand Down
47 changes: 29 additions & 18 deletions pkg/mq/msgstream/mq_msgstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1543,36 +1543,47 @@ func TestMqttStream_continueBuffering(t *testing.T) {
Params.Save(Params.ServiceParam.MQCfg.PursuitBufferSize.Key, "1024")

type testCase struct {
tag string
endTs uint64
size uint64
expect bool
tag string
endTs uint64
size uint64
expect bool
startTime time.Time
}

currTs := tsoutil.ComposeTSByTime(time.Now(), 0)
cases := []testCase{
{
tag: "first_run",
endTs: 0,
size: 0,
expect: true,
tag: "first_run",
endTs: 0,
size: 0,
expect: true,
startTime: time.Now(),
},
{
tag: "lag_large",
endTs: 1,
size: 10,
expect: false,
tag: "lag_large",
endTs: 1,
size: 10,
expect: false,
startTime: time.Now(),
},
{
tag: "currTs",
endTs: currTs,
size: 10,
expect: false,
tag: "currTs",
endTs: currTs,
size: 10,
expect: false,
startTime: time.Now(),
},
{
tag: "bufferTs",
endTs: currTs,
size: 10,
expect: true,
startTime: time.Now().Add(-time.Hour),
},
}
for _, tc := range cases {
t.Run(tc.tag, func(t *testing.T) {
assert.Equal(t, tc.expect, ms.continueBuffering(tc.endTs, tc.size))
assert.Equal(t, tc.expect, ms.continueBuffering(tc.endTs, tc.size, tc.startTime))
})
}
})
Expand Down Expand Up @@ -1618,7 +1629,7 @@ func TestMqttStream_continueBuffering(t *testing.T) {
}
for _, tc := range cases {
t.Run(tc.tag, func(t *testing.T) {
assert.Equal(t, tc.expect, ms.continueBuffering(tc.endTs, tc.size))
assert.Equal(t, tc.expect, ms.continueBuffering(tc.endTs, tc.size, time.Now()))
})
}
})
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,8 @@ func TestCachedParam(t *testing.T) {
assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64())
assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64())

assert.Equal(t, 60, params.ServiceParam.MQCfg.PursuitBufferTime.GetAsInt())

assert.Equal(t, int64(1024), params.DataCoordCfg.SegmentMaxSize.GetAsInt64())
assert.Equal(t, int64(1024), params.DataCoordCfg.SegmentMaxSize.GetAsInt64())

Expand Down
10 changes: 10 additions & 0 deletions pkg/util/paramtable/service_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ type MQConfig struct {
EnablePursuitMode ParamItem `refreshable:"true"`
PursuitLag ParamItem `refreshable:"true"`
PursuitBufferSize ParamItem `refreshable:"true"`
PursuitBufferTime ParamItem `refreshable:"true"`

MQBufSize ParamItem `refreshable:"false"`
ReceiveBufSize ParamItem `refreshable:"false"`
Expand Down Expand Up @@ -561,6 +562,15 @@ Valid values: [default, pulsar, kafka, rocksmq, natsmq]`,
}
p.PursuitBufferSize.Init(base.mgr)

p.PursuitBufferTime = ParamItem{
Key: "mq.pursuitBufferTime",
Version: "2.4.12",
DefaultValue: "60", // 60 s
Doc: `pursuit mode buffer time in seconds`,
Export: true,
}
p.PursuitBufferTime.Init(base.mgr)

p.MQBufSize = ParamItem{
Key: "mq.mqBufSize",
Version: "2.3.0",
Expand Down

0 comments on commit 8341e73

Please sign in to comment.