Skip to content

Commit

Permalink
executor: fix index out of range bug in hash join v2 (pingcap#55864)
Browse files Browse the repository at this point in the history
  • Loading branch information
windtalker committed Sep 5, 2024
1 parent edf1001 commit 41c9926
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 4 deletions.
6 changes: 2 additions & 4 deletions pkg/executor/join/base_join_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,8 @@ func (j *baseJoinProbe) appendBuildRowToChunkInternal(chk *chunk.Chunk, usedCols
var currentColumn *chunk.Column
if ok {
currentColumn = chk.Column(indexInDstChk)
// Other goroutine will use `atomic.StoreUint32` to write to the first 32 bit in nullmap when it need to set usedFlag
// so read from nullMap may meet concurrent write if meta.colOffsetInNullMap == 1 && (columnIndex + meta.colOffsetInNullMap <= 32)
mayConcurrentWrite := meta.colOffsetInNullMap == 1 && columnIndex <= 31
if !mayConcurrentWrite {
readNullMapThreadSafe := meta.isReadNullMapThreadSafe(columnIndex)
if readNullMapThreadSafe {
for index := range j.cachedBuildRows {
currentColumn.AppendNullBitmap(!meta.isColumnNull(*(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[index].buildRowStart)), columnIndex))
j.cachedBuildRows[index].buildRowOffset = chunk.AppendCellFromRawData(currentColumn, *(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[index].buildRowStart)), j.cachedBuildRows[index].buildRowOffset)
Expand Down
7 changes: 7 additions & 0 deletions pkg/executor/join/join_row_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ func (meta *TableMeta) getSerializedKeyLength(rowStart unsafe.Pointer) uint64 {
return *(*uint64)(unsafe.Add(rowStart, sizeOfNextPtr+meta.nullMapLength))
}

func (meta *TableMeta) isReadNullMapThreadSafe(columnIndex int) bool {
// Other goroutine will use `atomic.StoreUint32` to write to the first 32 bit in nullmap when it need to set usedFlag
// so read from nullMap may meet concurrent write if meta.colOffsetInNullMap == 1 && (columnIndex + meta.colOffsetInNullMap < 32)
mayConcurrentWrite := meta.colOffsetInNullMap == 1 && columnIndex < 31
return !mayConcurrentWrite
}

// used in tests
func (meta *TableMeta) getKeyBytes(rowStart unsafe.Pointer) []byte {
switch meta.keyMode {
Expand Down
14 changes: 14 additions & 0 deletions pkg/executor/join/join_row_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,20 @@ func TestJoinTableMetaKeyInlinedAndFixed(t *testing.T) {
}
}

func TestReadNullMapThreadSafe(t *testing.T) {
// meta with usedFlag
tinyTp := types.NewFieldType(mysql.TypeTiny)
metaWithUsedFlag := newTableMeta([]int{0}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, nil, []int{}, true)
for columnIndex := 0; columnIndex < 100; columnIndex++ {
require.Equal(t, columnIndex >= 31, metaWithUsedFlag.isReadNullMapThreadSafe(columnIndex))
}
// meta without usedFlag
metaWithoutUsedFlag := newTableMeta([]int{0}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, nil, []int{}, false)
for columnIndex := 0; columnIndex < 100; columnIndex++ {
require.Equal(t, true, metaWithoutUsedFlag.isReadNullMapThreadSafe(columnIndex))
}
}

func TestJoinTableMetaSerializedMode(t *testing.T) {
intTp := types.NewFieldType(mysql.TypeLonglong)
uintTp := types.NewFieldType(mysql.TypeLonglong)
Expand Down

0 comments on commit 41c9926

Please sign in to comment.