Skip to content

Commit

Permalink
executor: add batch copy to inner join, left and right outer join. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 committed Sep 5, 2018
1 parent 15e709c commit 92e6a5a
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 17 deletions.
38 changes: 23 additions & 15 deletions executor/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,19 +158,18 @@ func (j *baseJoiner) makeShallowJoinRow(isRightJoin bool, inner, outer chunk.Row
j.shallowRow.ShallowCopyPartialRow(inner.Len(), outer)
}

func (j *baseJoiner) filter(input, output *chunk.Chunk) (matched bool, err error) {
func (j *baseJoiner) filter(input, output *chunk.Chunk, outerColsLen int) (bool, error) {
var err error
j.selected, err = expression.VectorizedFilter(j.ctx, j.conditions, chunk.NewIterator4Chunk(input), j.selected)
if err != nil {
return false, errors.Trace(err)
}
for i := 0; i < len(j.selected); i++ {
if !j.selected[i] {
continue
}
matched = true
output.AppendRow(input.GetRow(i))
// Batch copies selected rows to output chunk.
innerColOffset, outerColOffset := 0, input.NumCols()-outerColsLen
if !j.outerIsRight {
innerColOffset, outerColOffset = outerColsLen, 0
}
return matched, nil
return chunk.CopySelectedJoinRows(input, innerColOffset, outerColOffset, j.selected, output), nil
}

type semiJoiner struct {
Expand Down Expand Up @@ -350,8 +349,11 @@ func (j *leftOuterJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk
}

// reach here, chkForJoin is j.chk
matched, err := j.filter(chkForJoin, chk)
return matched, errors.Trace(err)
matched, err := j.filter(chkForJoin, chk, outer.Len())
if err != nil {
return false, errors.Trace(err)
}
return matched, nil
}

func (j *leftOuterJoiner) onMissMatch(outer chunk.Row, chk *chunk.Chunk) {
Expand Down Expand Up @@ -384,9 +386,11 @@ func (j *rightOuterJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, ch
return true, nil
}

// reach here, chkForJoin is j.chk
matched, err := j.filter(chkForJoin, chk)
return matched, errors.Trace(err)
matched, err := j.filter(chkForJoin, chk, outer.Len())
if err != nil {
return false, errors.Trace(err)
}
return matched, nil
}

func (j *rightOuterJoiner) onMissMatch(outer chunk.Row, chk *chunk.Chunk) {
Expand Down Expand Up @@ -421,8 +425,12 @@ func (j *innerJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk *ch
}

// reach here, chkForJoin is j.chk
matched, err := j.filter(chkForJoin, chk)
return matched, errors.Trace(err)
matched, err := j.filter(chkForJoin, chk, outer.Len())
if err != nil {
return false, errors.Trace(err)
}
return matched, nil

}

func (j *innerJoiner) onMissMatch(outer chunk.Row, chk *chunk.Chunk) {
Expand Down
107 changes: 107 additions & 0 deletions util/chunk/chunk_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package chunk

// CopySelectedJoinRows copies the selected joined rows from the source Chunk
// to the destination Chunk.
// Return true if at least one joined row was selected.
//
// NOTE: All the outer rows in the source Chunk should be the same.
func CopySelectedJoinRows(src *Chunk, innerColOffset, outerColOffset int, selected []bool, dst *Chunk) bool {
if src.NumRows() == 0 {
return false
}

numSelected := copySelectedInnerRows(innerColOffset, outerColOffset, src, selected, dst)
copyOuterRows(innerColOffset, outerColOffset, src, numSelected, dst)
dst.numVirtualRows += numSelected
return numSelected > 0
}

// copySelectedInnerRows copies the selected inner rows from the source Chunk
// to the destination Chunk.
// return the number of rows which is selected.
func copySelectedInnerRows(innerColOffset, outerColOffset int, src *Chunk, selected []bool, dst *Chunk) int {
oldLen := dst.columns[innerColOffset].length
var srcCols []*column
if innerColOffset == 0 {
srcCols = src.columns[:outerColOffset]
} else {
srcCols = src.columns[innerColOffset:]
}
for j, srcCol := range srcCols {
dstCol := dst.columns[innerColOffset+j]
if srcCol.isFixed() {
for i := 0; i < len(selected); i++ {
if !selected[i] {
continue
}
dstCol.appendNullBitmap(!srcCol.isNull(i))
dstCol.length++

elemLen := len(srcCol.elemBuf)
offset := i * elemLen
dstCol.data = append(dstCol.data, srcCol.data[offset:offset+elemLen]...)
}
} else {
for i := 0; i < len(selected); i++ {
if !selected[i] {
continue
}
dstCol.appendNullBitmap(!srcCol.isNull(i))
dstCol.length++

start, end := srcCol.offsets[i], srcCol.offsets[i+1]
dstCol.data = append(dstCol.data, srcCol.data[start:end]...)
dstCol.offsets = append(dstCol.offsets, int32(len(dstCol.data)))
}
}
}
return dst.columns[innerColOffset].length - oldLen
}

// copyOuterRows copies the continuous 'numRows' outer rows in the source Chunk
// to the destination Chunk.
func copyOuterRows(innerColOffset, outerColOffset int, src *Chunk, numRows int, dst *Chunk) {
if numRows <= 0 {
return
}
row := src.GetRow(0)
var srcCols []*column
if innerColOffset == 0 {
srcCols = src.columns[outerColOffset:]
} else {
srcCols = src.columns[:innerColOffset]
}
for i, srcCol := range srcCols {
dstCol := dst.columns[outerColOffset+i]
dstCol.appendMultiSameNullBitmap(!srcCol.isNull(row.idx), numRows)
dstCol.length += numRows
if srcCol.isFixed() {
elemLen := len(srcCol.elemBuf)
start := row.idx * elemLen
end := start + numRows*elemLen
dstCol.data = append(dstCol.data, srcCol.data[start:end]...)
} else {
start, end := srcCol.offsets[row.idx], srcCol.offsets[row.idx+numRows]
dstCol.data = append(dstCol.data, srcCol.data[start:end]...)
offsets := dstCol.offsets
elemLen := srcCol.offsets[row.idx+1] - srcCol.offsets[row.idx]
for j := 0; j < numRows; j++ {
offsets = append(offsets, int32(offsets[len(offsets)-1]+elemLen))
}
dstCol.offsets = offsets
}
}
}
83 changes: 83 additions & 0 deletions util/chunk/chunk_util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package chunk

import (
"reflect"
"testing"

"github.com/pingcap/tidb/types"
)

func getChk() (*Chunk, *Chunk, []bool) {
numRows := 1024
srcChk := newChunkWithInitCap(numRows, 0, 0, 8, 8, 16, 0)
selected := make([]bool, numRows)
var row Row
for j := 0; j < numRows; j++ {
if j%7 == 0 {
row = MutRowFromValues("abc", "abcdefg", nil, 123, types.ZeroDatetime, "abcdefg").ToRow()
} else {
row = MutRowFromValues("abc", "abcdefg", j, 123, types.ZeroDatetime, "abcdefg").ToRow()
selected[j] = true
}
srcChk.AppendPartialRow(0, row)
}
dstChk := newChunkWithInitCap(numRows, 0, 0, 8, 8, 16, 0)
return srcChk, dstChk, selected
}

func TestCopySelectedJoinRows(t *testing.T) {
srcChk, dstChk, selected := getChk()
numRows := srcChk.NumRows()
for i := 0; i < numRows; i++ {
if !selected[i] {
continue
}
dstChk.AppendRow(srcChk.GetRow(i))
}
// batch copy
dstChk2 := newChunkWithInitCap(numRows, 0, 0, 8, 8, 16, 0)
CopySelectedJoinRows(srcChk, 0, 3, selected, dstChk2)

if !reflect.DeepEqual(dstChk, dstChk2) {
t.Fatal()
}
}

func BenchmarkCopySelectedJoinRows(b *testing.B) {
b.ReportAllocs()
srcChk, dstChk, selected := getChk()
b.ResetTimer()
for i := 0; i < b.N; i++ {
dstChk.Reset()
CopySelectedJoinRows(srcChk, 0, 3, selected, dstChk)
}
}

func BenchmarkAppendSelectedRow(b *testing.B) {
b.ReportAllocs()
srcChk, dstChk, selected := getChk()
numRows := srcChk.NumRows()
b.ResetTimer()
for i := 0; i < b.N; i++ {
dstChk.Reset()
for j := 0; j < numRows; j++ {
if !selected[j] {
continue
}
dstChk.AppendRow(srcChk.GetRow(j))
}
}
}
30 changes: 28 additions & 2 deletions util/chunk/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,45 @@ func (c *column) isNull(rowIdx int) bool {
return nullByte&(1<<(uint(rowIdx)&7)) == 0
}

func (c *column) appendNullBitmap(on bool) {
func (c *column) appendNullBitmap(notNull bool) {
idx := c.length >> 3
if idx >= len(c.nullBitmap) {
c.nullBitmap = append(c.nullBitmap, 0)
}
if on {
if notNull {
pos := uint(c.length) & 7
c.nullBitmap[idx] |= byte(1 << pos)
} else {
c.nullCount++
}
}

// appendMultiSameNullBitmap appends multiple same bit value to `nullBitMap`.
// notNull means not null.
// num means the number of bits that should be appended.
func (c *column) appendMultiSameNullBitmap(notNull bool, num int) {
numNewBytes := ((c.length + num + 7) >> 3) - len(c.nullBitmap)
b := byte(0)
if notNull {
b = 0xff
}
for i := 0; i < numNewBytes; i++ {
c.nullBitmap = append(c.nullBitmap, b)
}
if !notNull {
c.nullCount += num
return
}
// 1. Set all the remaining bits in the last slot of old c.numBitMap to 1.
numRemainingBits := uint(c.length % 8)
bitMask := byte(^((1 << numRemainingBits) - 1))
c.nullBitmap[c.length/8] |= bitMask
// 2. Set all the redundant bits in the last slot of new c.numBitMap to 0.
numRedundantBits := uint(len(c.nullBitmap)*8 - c.length - num)
bitMask = byte(1<<(8-numRedundantBits)) - 1
c.nullBitmap[len(c.nullBitmap)-1] &= bitMask
}

func (c *column) appendNull() {
c.appendNullBitmap(false)
if c.isFixed() {
Expand Down

0 comments on commit 92e6a5a

Please sign in to comment.