diff --git a/executor/joiner.go b/executor/joiner.go index 870669268d064..5afa69676e34b 100644 --- a/executor/joiner.go +++ b/executor/joiner.go @@ -158,19 +158,18 @@ func (j *baseJoiner) makeShallowJoinRow(isRightJoin bool, inner, outer chunk.Row j.shallowRow.ShallowCopyPartialRow(inner.Len(), outer) } -func (j *baseJoiner) filter(input, output *chunk.Chunk) (matched bool, err error) { +func (j *baseJoiner) filter(input, output *chunk.Chunk, outerColsLen int) (bool, error) { + var err error j.selected, err = expression.VectorizedFilter(j.ctx, j.conditions, chunk.NewIterator4Chunk(input), j.selected) if err != nil { return false, errors.Trace(err) } - for i := 0; i < len(j.selected); i++ { - if !j.selected[i] { - continue - } - matched = true - output.AppendRow(input.GetRow(i)) + // Batch copies selected rows to output chunk. + innerColOffset, outerColOffset := 0, input.NumCols()-outerColsLen + if !j.outerIsRight { + innerColOffset, outerColOffset = outerColsLen, 0 } - return matched, nil + return chunk.CopySelectedJoinRows(input, innerColOffset, outerColOffset, j.selected, output), nil } type semiJoiner struct { @@ -350,8 +349,11 @@ func (j *leftOuterJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk } // reach here, chkForJoin is j.chk - matched, err := j.filter(chkForJoin, chk) - return matched, errors.Trace(err) + matched, err := j.filter(chkForJoin, chk, outer.Len()) + if err != nil { + return false, errors.Trace(err) + } + return matched, nil } func (j *leftOuterJoiner) onMissMatch(outer chunk.Row, chk *chunk.Chunk) { @@ -384,9 +386,11 @@ func (j *rightOuterJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, ch return true, nil } - // reach here, chkForJoin is j.chk - matched, err := j.filter(chkForJoin, chk) - return matched, errors.Trace(err) + matched, err := j.filter(chkForJoin, chk, outer.Len()) + if err != nil { + return false, errors.Trace(err) + } + return matched, nil } func (j *rightOuterJoiner) onMissMatch(outer chunk.Row, chk *chunk.Chunk) { @@ -421,8 +425,12 @@ func (j *innerJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk *ch } // reach here, chkForJoin is j.chk - matched, err := j.filter(chkForJoin, chk) - return matched, errors.Trace(err) + matched, err := j.filter(chkForJoin, chk, outer.Len()) + if err != nil { + return false, errors.Trace(err) + } + return matched, nil + } func (j *innerJoiner) onMissMatch(outer chunk.Row, chk *chunk.Chunk) { diff --git a/util/chunk/chunk_util.go b/util/chunk/chunk_util.go new file mode 100644 index 0000000000000..71b0f88768575 --- /dev/null +++ b/util/chunk/chunk_util.go @@ -0,0 +1,107 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunk + +// CopySelectedJoinRows copies the selected joined rows from the source Chunk +// to the destination Chunk. +// Return true if at least one joined row was selected. +// +// NOTE: All the outer rows in the source Chunk should be the same. +func CopySelectedJoinRows(src *Chunk, innerColOffset, outerColOffset int, selected []bool, dst *Chunk) bool { + if src.NumRows() == 0 { + return false + } + + numSelected := copySelectedInnerRows(innerColOffset, outerColOffset, src, selected, dst) + copyOuterRows(innerColOffset, outerColOffset, src, numSelected, dst) + dst.numVirtualRows += numSelected + return numSelected > 0 +} + +// copySelectedInnerRows copies the selected inner rows from the source Chunk +// to the destination Chunk. +// return the number of rows which is selected. +func copySelectedInnerRows(innerColOffset, outerColOffset int, src *Chunk, selected []bool, dst *Chunk) int { + oldLen := dst.columns[innerColOffset].length + var srcCols []*column + if innerColOffset == 0 { + srcCols = src.columns[:outerColOffset] + } else { + srcCols = src.columns[innerColOffset:] + } + for j, srcCol := range srcCols { + dstCol := dst.columns[innerColOffset+j] + if srcCol.isFixed() { + for i := 0; i < len(selected); i++ { + if !selected[i] { + continue + } + dstCol.appendNullBitmap(!srcCol.isNull(i)) + dstCol.length++ + + elemLen := len(srcCol.elemBuf) + offset := i * elemLen + dstCol.data = append(dstCol.data, srcCol.data[offset:offset+elemLen]...) + } + } else { + for i := 0; i < len(selected); i++ { + if !selected[i] { + continue + } + dstCol.appendNullBitmap(!srcCol.isNull(i)) + dstCol.length++ + + start, end := srcCol.offsets[i], srcCol.offsets[i+1] + dstCol.data = append(dstCol.data, srcCol.data[start:end]...) + dstCol.offsets = append(dstCol.offsets, int32(len(dstCol.data))) + } + } + } + return dst.columns[innerColOffset].length - oldLen +} + +// copyOuterRows copies the continuous 'numRows' outer rows in the source Chunk +// to the destination Chunk. +func copyOuterRows(innerColOffset, outerColOffset int, src *Chunk, numRows int, dst *Chunk) { + if numRows <= 0 { + return + } + row := src.GetRow(0) + var srcCols []*column + if innerColOffset == 0 { + srcCols = src.columns[outerColOffset:] + } else { + srcCols = src.columns[:innerColOffset] + } + for i, srcCol := range srcCols { + dstCol := dst.columns[outerColOffset+i] + dstCol.appendMultiSameNullBitmap(!srcCol.isNull(row.idx), numRows) + dstCol.length += numRows + if srcCol.isFixed() { + elemLen := len(srcCol.elemBuf) + start := row.idx * elemLen + end := start + numRows*elemLen + dstCol.data = append(dstCol.data, srcCol.data[start:end]...) + } else { + start, end := srcCol.offsets[row.idx], srcCol.offsets[row.idx+numRows] + dstCol.data = append(dstCol.data, srcCol.data[start:end]...) + offsets := dstCol.offsets + elemLen := srcCol.offsets[row.idx+1] - srcCol.offsets[row.idx] + for j := 0; j < numRows; j++ { + offsets = append(offsets, int32(offsets[len(offsets)-1]+elemLen)) + } + dstCol.offsets = offsets + } + } +} diff --git a/util/chunk/chunk_util_test.go b/util/chunk/chunk_util_test.go new file mode 100644 index 0000000000000..9ed6a662def19 --- /dev/null +++ b/util/chunk/chunk_util_test.go @@ -0,0 +1,83 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunk + +import ( + "reflect" + "testing" + + "github.com/pingcap/tidb/types" +) + +func getChk() (*Chunk, *Chunk, []bool) { + numRows := 1024 + srcChk := newChunkWithInitCap(numRows, 0, 0, 8, 8, 16, 0) + selected := make([]bool, numRows) + var row Row + for j := 0; j < numRows; j++ { + if j%7 == 0 { + row = MutRowFromValues("abc", "abcdefg", nil, 123, types.ZeroDatetime, "abcdefg").ToRow() + } else { + row = MutRowFromValues("abc", "abcdefg", j, 123, types.ZeroDatetime, "abcdefg").ToRow() + selected[j] = true + } + srcChk.AppendPartialRow(0, row) + } + dstChk := newChunkWithInitCap(numRows, 0, 0, 8, 8, 16, 0) + return srcChk, dstChk, selected +} + +func TestCopySelectedJoinRows(t *testing.T) { + srcChk, dstChk, selected := getChk() + numRows := srcChk.NumRows() + for i := 0; i < numRows; i++ { + if !selected[i] { + continue + } + dstChk.AppendRow(srcChk.GetRow(i)) + } + // batch copy + dstChk2 := newChunkWithInitCap(numRows, 0, 0, 8, 8, 16, 0) + CopySelectedJoinRows(srcChk, 0, 3, selected, dstChk2) + + if !reflect.DeepEqual(dstChk, dstChk2) { + t.Fatal() + } +} + +func BenchmarkCopySelectedJoinRows(b *testing.B) { + b.ReportAllocs() + srcChk, dstChk, selected := getChk() + b.ResetTimer() + for i := 0; i < b.N; i++ { + dstChk.Reset() + CopySelectedJoinRows(srcChk, 0, 3, selected, dstChk) + } +} + +func BenchmarkAppendSelectedRow(b *testing.B) { + b.ReportAllocs() + srcChk, dstChk, selected := getChk() + numRows := srcChk.NumRows() + b.ResetTimer() + for i := 0; i < b.N; i++ { + dstChk.Reset() + for j := 0; j < numRows; j++ { + if !selected[j] { + continue + } + dstChk.AppendRow(srcChk.GetRow(j)) + } + } +} diff --git a/util/chunk/column.go b/util/chunk/column.go index 903adbeebc5c2..0720eb7951533 100644 --- a/util/chunk/column.go +++ b/util/chunk/column.go @@ -72,12 +72,12 @@ func (c *column) isNull(rowIdx int) bool { return nullByte&(1<<(uint(rowIdx)&7)) == 0 } -func (c *column) appendNullBitmap(on bool) { +func (c *column) appendNullBitmap(notNull bool) { idx := c.length >> 3 if idx >= len(c.nullBitmap) { c.nullBitmap = append(c.nullBitmap, 0) } - if on { + if notNull { pos := uint(c.length) & 7 c.nullBitmap[idx] |= byte(1 << pos) } else { @@ -85,6 +85,32 @@ func (c *column) appendNullBitmap(on bool) { } } +// appendMultiSameNullBitmap appends multiple same bit value to `nullBitMap`. +// notNull means not null. +// num means the number of bits that should be appended. +func (c *column) appendMultiSameNullBitmap(notNull bool, num int) { + numNewBytes := ((c.length + num + 7) >> 3) - len(c.nullBitmap) + b := byte(0) + if notNull { + b = 0xff + } + for i := 0; i < numNewBytes; i++ { + c.nullBitmap = append(c.nullBitmap, b) + } + if !notNull { + c.nullCount += num + return + } + // 1. Set all the remaining bits in the last slot of old c.numBitMap to 1. + numRemainingBits := uint(c.length % 8) + bitMask := byte(^((1 << numRemainingBits) - 1)) + c.nullBitmap[c.length/8] |= bitMask + // 2. Set all the redundant bits in the last slot of new c.numBitMap to 0. + numRedundantBits := uint(len(c.nullBitmap)*8 - c.length - num) + bitMask = byte(1<<(8-numRedundantBits)) - 1 + c.nullBitmap[len(c.nullBitmap)-1] &= bitMask +} + func (c *column) appendNull() { c.appendNullBitmap(false) if c.isFixed() {