Skip to content

Commit

Permalink
executor: implement cteutil.Storage (#24193)
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge committed May 24, 2021
1 parent 23ce657 commit 26cf50e
Show file tree
Hide file tree
Showing 2 changed files with 533 additions and 0 deletions.
271 changes: 271 additions & 0 deletions util/cteutil/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package cteutil

import (
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/memory"
)

var _ Storage = &StorageRC{}

// Storage is a temporary storage to store the intermidate data of CTE.
//
// Common usage as follows:
//
// storage.Lock()
// if !storage.Done() {
// fill all data into storage
// }
// storage.UnLock()
// read data from storage
type Storage interface {
// If is first called, will open underlying storage. Otherwise will add ref count by one.
OpenAndRef() error

// Minus ref count by one, if ref count is zero, close underlying storage.
DerefAndClose() (err error)

// SwapData swaps data of two storage.
// Other metainfo is not touched, such ref count/done flag etc.
SwapData(other Storage) error

// Reopen reset storage and related info.
// So the status of Storage is like a new created one.
Reopen() error

// Add chunk into underlying storage.
Add(chk *chunk.Chunk) error

// Get Chunk by index.
GetChunk(chkIdx int) (*chunk.Chunk, error)

// Get row by RowPtr.
GetRow(ptr chunk.RowPtr) (chunk.Row, error)

// NumChunks return chunk number of the underlying storage.
NumChunks() int

// Storage is not thread-safe.
// By using Lock(), users can achieve the purpose of ensuring thread safety.
Lock()
Unlock()

// Usually, Storage is filled first, then user can read it.
// User can check whether Storage is filled first, if not, they can fill it.
Done() bool
SetDone()

// Readers use iter information to determine
// whether they need to read data from the beginning.
SetIter(iter int)
GetIter() int

// We use this channel to notify reader that Storage is ready to read.
// It exists only to solve the special implementation of IndexLookUpJoin.
// We will find a better way and remove this later.
GetBegCh() chan struct{}

GetMemTracker() *memory.Tracker
GetDiskTracker() *disk.Tracker
ActionSpill() memory.ActionOnExceed
}

// StorageRC implements Storage interface using RowContainer.
type StorageRC struct {
mu sync.Mutex
refCnt int
tp []*types.FieldType
chkSize int

begCh chan struct{}
done bool
iter int

rc *chunk.RowContainer
}

// NewStorageRC create a new StorageRC.
func NewStorageRC(tp []*types.FieldType, chkSize int) *StorageRC {
return &StorageRC{tp: tp, chkSize: chkSize}
}

// OpenAndRef impls Storage OpenAndRef interface.
func (s *StorageRC) OpenAndRef() (err error) {
if !s.valid() {
s.rc = chunk.NewRowContainer(s.tp, s.chkSize)
s.refCnt = 1
s.begCh = make(chan struct{})
s.iter = 0
} else {
s.refCnt += 1
}
return nil
}

// DerefAndClose impls Storage DerefAndClose interface.
func (s *StorageRC) DerefAndClose() (err error) {
if !s.valid() {
return errors.New("Storage not opend yet")
}
s.refCnt -= 1
if s.refCnt < 0 {
return errors.New("Storage ref count is less than zero")
} else if s.refCnt == 0 {
// TODO: unreg memtracker
if err = s.rc.Close(); err != nil {
return err
}
if err = s.resetAll(); err != nil {
return err
}
}
return nil
}

// SwapData impls Storage Swap interface.
func (s *StorageRC) SwapData(other Storage) (err error) {
otherRC, ok := other.(*StorageRC)
if !ok {
return errors.New("cannot swap if underlying storages are different")
}
s.tp, otherRC.tp = otherRC.tp, s.tp
s.chkSize, otherRC.chkSize = otherRC.chkSize, s.chkSize

s.rc, otherRC.rc = otherRC.rc, s.rc
return nil
}

// Reopen impls Storage Reopen interface.
func (s *StorageRC) Reopen() (err error) {
if err = s.rc.Reset(); err != nil {
return err
}
s.iter = 0
s.begCh = make(chan struct{})
s.done = false
// Create a new RowContainer.
// Because some meta infos in old RowContainer are not resetted.
// Such as memTracker/actionSpill etc. So we just use a new one.
s.rc = chunk.NewRowContainer(s.tp, s.chkSize)
return nil
}

// Add impls Storage Add interface.
func (s *StorageRC) Add(chk *chunk.Chunk) (err error) {
if !s.valid() {
return errors.New("Storage is not valid")
}
if chk.NumRows() == 0 {
return nil
}
return s.rc.Add(chk)
}

// GetChunk impls Storage GetChunk interface.
func (s *StorageRC) GetChunk(chkIdx int) (*chunk.Chunk, error) {
if !s.valid() {
return nil, errors.New("Storage is not valid")
}
return s.rc.GetChunk(chkIdx)
}

// GetRow impls Storage GetRow interface.
func (s *StorageRC) GetRow(ptr chunk.RowPtr) (chunk.Row, error) {
if !s.valid() {
return chunk.Row{}, errors.New("Storage is not valid")
}
return s.rc.GetRow(ptr)
}

// NumChunks impls Storage NumChunks interface.
func (s *StorageRC) NumChunks() int {
return s.rc.NumChunks()
}

// Lock impls Storage Lock interface.
func (s *StorageRC) Lock() {
s.mu.Lock()
}

// Unlock impls Storage Unlock interface.
func (s *StorageRC) Unlock() {
s.mu.Unlock()
}

// Done impls Storage Done interface.
func (s *StorageRC) Done() bool {
return s.done
}

// SetDone impls Storage SetDone interface.
func (s *StorageRC) SetDone() {
s.done = true
}

// SetIter impls Storage SetIter interface.
func (s *StorageRC) SetIter(iter int) {
s.iter = iter
}

// GetIter impls Storage GetIter interface.
func (s *StorageRC) GetIter() int {
return s.iter
}

// GetBegCh impls Storage GetBegCh interface.
func (s *StorageRC) GetBegCh() chan struct{} {
return s.begCh
}

// GetMemTracker impls Storage GetMemTracker interface.
func (s *StorageRC) GetMemTracker() *memory.Tracker {
return s.rc.GetMemTracker()
}

// GetDiskTracker impls Storage GetDiskTracker interface.
func (s *StorageRC) GetDiskTracker() *memory.Tracker {
return s.rc.GetDiskTracker()
}

// ActionSpill impls Storage ActionSpill interface.
func (s *StorageRC) ActionSpill() memory.ActionOnExceed {
return s.rc.ActionSpill()
}

// ActionSpillForTest is for test.
func (s *StorageRC) ActionSpillForTest() *chunk.SpillDiskAction {
return s.rc.ActionSpillForTest()
}

func (s *StorageRC) resetAll() error {
s.refCnt = -1
s.begCh = nil
s.done = false
s.iter = 0
if err := s.rc.Reset(); err != nil {
return err
}
s.rc = nil
return nil
}

func (s *StorageRC) valid() bool {
return s.refCnt > 0 && s.rc != nil
}
Loading

0 comments on commit 26cf50e

Please sign in to comment.