From 41c99261bf196adba138c01e7daa68b002ca1dbb Mon Sep 17 00:00:00 2001 From: xufei Date: Thu, 5 Sep 2024 13:13:03 +0800 Subject: [PATCH] executor: fix index out of range bug in hash join v2 (#55864) ref pingcap/tidb#53127 --- pkg/executor/join/base_join_probe.go | 6 ++---- pkg/executor/join/join_row_table.go | 7 +++++++ pkg/executor/join/join_row_table_test.go | 14 ++++++++++++++ 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/pkg/executor/join/base_join_probe.go b/pkg/executor/join/base_join_probe.go index db3f3fb8dacc2..7088122d05e73 100644 --- a/pkg/executor/join/base_join_probe.go +++ b/pkg/executor/join/base_join_probe.go @@ -364,10 +364,8 @@ func (j *baseJoinProbe) appendBuildRowToChunkInternal(chk *chunk.Chunk, usedCols var currentColumn *chunk.Column if ok { currentColumn = chk.Column(indexInDstChk) - // Other goroutine will use `atomic.StoreUint32` to write to the first 32 bit in nullmap when it need to set usedFlag - // so read from nullMap may meet concurrent write if meta.colOffsetInNullMap == 1 && (columnIndex + meta.colOffsetInNullMap <= 32) - mayConcurrentWrite := meta.colOffsetInNullMap == 1 && columnIndex <= 31 - if !mayConcurrentWrite { + readNullMapThreadSafe := meta.isReadNullMapThreadSafe(columnIndex) + if readNullMapThreadSafe { for index := range j.cachedBuildRows { currentColumn.AppendNullBitmap(!meta.isColumnNull(*(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[index].buildRowStart)), columnIndex)) j.cachedBuildRows[index].buildRowOffset = chunk.AppendCellFromRawData(currentColumn, *(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[index].buildRowStart)), j.cachedBuildRows[index].buildRowOffset) diff --git a/pkg/executor/join/join_row_table.go b/pkg/executor/join/join_row_table.go index 73d70bae9a426..2cdd9ff8caa2a 100644 --- a/pkg/executor/join/join_row_table.go +++ b/pkg/executor/join/join_row_table.go @@ -218,6 +218,13 @@ func (meta *TableMeta) getSerializedKeyLength(rowStart unsafe.Pointer) uint64 { return *(*uint64)(unsafe.Add(rowStart, sizeOfNextPtr+meta.nullMapLength)) } +func (meta *TableMeta) isReadNullMapThreadSafe(columnIndex int) bool { + // Other goroutine will use `atomic.StoreUint32` to write to the first 32 bit in nullmap when it need to set usedFlag + // so read from nullMap may meet concurrent write if meta.colOffsetInNullMap == 1 && (columnIndex + meta.colOffsetInNullMap < 32) + mayConcurrentWrite := meta.colOffsetInNullMap == 1 && columnIndex < 31 + return !mayConcurrentWrite +} + // used in tests func (meta *TableMeta) getKeyBytes(rowStart unsafe.Pointer) []byte { switch meta.keyMode { diff --git a/pkg/executor/join/join_row_table_test.go b/pkg/executor/join/join_row_table_test.go index f82e64765fc5b..721f572e939b6 100644 --- a/pkg/executor/join/join_row_table_test.go +++ b/pkg/executor/join/join_row_table_test.go @@ -184,6 +184,20 @@ func TestJoinTableMetaKeyInlinedAndFixed(t *testing.T) { } } +func TestReadNullMapThreadSafe(t *testing.T) { + // meta with usedFlag + tinyTp := types.NewFieldType(mysql.TypeTiny) + metaWithUsedFlag := newTableMeta([]int{0}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, nil, []int{}, true) + for columnIndex := 0; columnIndex < 100; columnIndex++ { + require.Equal(t, columnIndex >= 31, metaWithUsedFlag.isReadNullMapThreadSafe(columnIndex)) + } + // meta without usedFlag + metaWithoutUsedFlag := newTableMeta([]int{0}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, nil, []int{}, false) + for columnIndex := 0; columnIndex < 100; columnIndex++ { + require.Equal(t, true, metaWithoutUsedFlag.isReadNullMapThreadSafe(columnIndex)) + } +} + func TestJoinTableMetaSerializedMode(t *testing.T) { intTp := types.NewFieldType(mysql.TypeLonglong) uintTp := types.NewFieldType(mysql.TypeLonglong)