Skip to content

Commit

Permalink
Merge branch 'master' into tidb-as-library
Browse files Browse the repository at this point in the history
  • Loading branch information
tisonkun committed Apr 27, 2022
2 parents 63114d2 + 1a19f95 commit fb32d69
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 25 deletions.
2 changes: 1 addition & 1 deletion ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func updateTiFlashStores(pollTiFlashContext *TiFlashManagementContext) error {
}
}
}
logutil.BgLogger().Info("updateTiFlashStores finished", zap.Int("TiFlash store count", len(pollTiFlashContext.TiFlashStores)))
logutil.BgLogger().Debug("updateTiFlashStores finished", zap.Int("TiFlash store count", len(pollTiFlashContext.TiFlashStores)))
return nil
}

Expand Down
6 changes: 6 additions & 0 deletions executor/parallel_apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ func TestParallelApplyPlan(t *testing.T) {
q2 := "select * from t t0 where t0.b <= (select max(t1.b) from t t1 where t1.b > (select max(b) from t t2 where t1.a > t2.a and t0.a > t2.a));"
checkApplyPlan(t, tk, q2, 1) // only the outside apply can be parallel
tk.MustQuery(q2).Sort().Check(testkit.Rows("1 1", "2 2", "3 3", "4 4", "5 5", "6 6", "7 7", "8 8", "9 9"))
q3 := "select t1.b from t t1 where t1.b > (select max(b) from t t2 where t1.a > t2.a) order by t1.a"
checkApplyPlan(t, tk, q3, 0)
tk.MustExec("alter table t add index idx(a)")
checkApplyPlan(t, tk, q3, 1)
tk.MustQuery(q3).Sort().Check(testkit.Rows("1", "2", "3", "4", "5", "6", "7", "8", "9"))
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 Parallel Apply rejects the possible order properties of its outer child currently"))
}

func TestApplyColumnType(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2175,6 +2175,10 @@ func (la *LogicalApply) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([
"MPP mode may be blocked because operator `Apply` is not supported now.")
return nil, true, nil
}
if !prop.IsEmpty() && la.SCtx().GetSessionVars().EnableParallelApply {
la.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("Parallel Apply rejects the possible order properties of its outer child currently"))
return nil, true, nil
}
disableAggPushDownToCop(la.children[0])
join := la.GetHashJoin(prop)
var columns = make([]*expression.Column, 0, len(la.CorCols))
Expand Down
31 changes: 31 additions & 0 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/dbterror"
Expand Down Expand Up @@ -2007,3 +2009,32 @@ func oldPasswordUpgrade(pass string) (string, error) {
newpass := fmt.Sprintf("*%X", hash2)
return newpass, nil
}

// rebuildAllPartitionValueMapAndSorted rebuilds all value map and sorted info for list column partitions with InfoSchema.
func rebuildAllPartitionValueMapAndSorted(s *session) {
type partitionExpr interface {
PartitionExpr() (*tables.PartitionExpr, error)
}

p := parser.New()
is := s.GetInfoSchema().(infoschema.InfoSchema)
for _, dbInfo := range is.AllSchemas() {
for _, t := range is.SchemaTables(dbInfo.Name) {
pi := t.Meta().GetPartitionInfo()
if pi == nil || pi.Type != model.PartitionTypeList {
continue
}

pe, err := t.(partitionExpr).PartitionExpr()
if err != nil {
panic("partition table gets partition expression failed")
}
for _, cp := range pe.ColPrunes {
if err = cp.RebuildPartitionValueMapAndSorted(p); err != nil {
logutil.BgLogger().Warn("build list column partition value map and sorted failed")
break
}
}
}
}
}
2 changes: 2 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2795,6 +2795,8 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
return nil, err
}
collate.SetNewCollationEnabledForTest(newCollationEnabled)
// To deal with the location partition failure caused by inconsistent NewCollationEnabled values(see issue #32416).
rebuildAllPartitionValueMapAndSorted(ses[0])

err = updateMemoryConfigAndSysVar(ses[0])
if err != nil {
Expand Down
44 changes: 20 additions & 24 deletions table/tables/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ type ForListColumnPruning struct {
ExprCol *expression.Column
valueTp *types.FieldType
valueMap map[string]ListPartitionLocation
mu sync.RWMutex
sorted *btree.BTree

// To deal with the location partition failure caused by inconsistent NewCollationEnabled values(see issue #32416).
Expand Down Expand Up @@ -658,6 +657,7 @@ func (lp *ForListPruning) buildListColumnsPruner(ctx sessionctx.Context, tblInfo
columns []*expression.Column, names types.NameSlice) error {
pi := tblInfo.GetPartitionInfo()
schema := expression.NewSchema(columns...)
p := parser.New()
colPrunes := make([]*ForListColumnPruning, 0, len(pi.Columns))
for colIdx := range pi.Columns {
colInfo := model.FindColumnInfo(tblInfo.Columns, pi.Columns[colIdx].L)
Expand All @@ -679,7 +679,10 @@ func (lp *ForListPruning) buildListColumnsPruner(ctx sessionctx.Context, tblInfo
valueMap: make(map[string]ListPartitionLocation),
sorted: btree.New(btreeDegree),
}

err := colPrune.buildPartitionValueMapAndSorted(p)
if err != nil {
return err
}
colPrunes = append(colPrunes, colPrune)
}
lp.ColPrunes = colPrunes
Expand Down Expand Up @@ -760,22 +763,28 @@ func (lp *ForListPruning) locateListColumnsPartitionByRow(ctx sessionctx.Context
return location[0].PartIdx, nil
}

// buildListPartitionValueMapAndSorted builds list columns partition value map for the specified column.
// it also builds list columns partition value btree for the specified column.
// buildPartitionValueMapAndSorted builds list columns partition value map for the specified column.
// It also builds list columns partition value btree for the specified column.
// colIdx is the specified column index in the list columns.
func (lp *ForListColumnPruning) buildPartitionValueMapAndSorted() error {
lp.mu.RLock()
func (lp *ForListColumnPruning) buildPartitionValueMapAndSorted(p *parser.Parser) error {
l := len(lp.valueMap)
lp.mu.RUnlock()
if l != 0 {
return nil
}

p := parser.New()
return lp.buildListPartitionValueMapAndSorted(p)
}

// RebuildPartitionValueMapAndSorted rebuilds list columns partition value map for the specified column.
func (lp *ForListColumnPruning) RebuildPartitionValueMapAndSorted(p *parser.Parser) error {
lp.valueMap = make(map[string]ListPartitionLocation, len(lp.valueMap))
lp.sorted.Clear(false)
return lp.buildListPartitionValueMapAndSorted(p)
}

func (lp *ForListColumnPruning) buildListPartitionValueMapAndSorted(p *parser.Parser) error {
pi := lp.tblInfo.GetPartitionInfo()
sc := lp.ctx.GetSessionVars().StmtCtx
lp.mu.Lock()
defer lp.mu.Unlock()
for partitionIdx, def := range pi.Definitions {
for groupIdx, vs := range def.InValues {
keyBytes, err := lp.genConstExprKey(lp.ctx, sc, vs[lp.colIdx], lp.schema, lp.names, p)
Expand Down Expand Up @@ -830,19 +839,11 @@ func (lp *ForListColumnPruning) genKey(sc *stmtctx.StatementContext, v types.Dat

// LocatePartition locates partition by the column value
func (lp *ForListColumnPruning) LocatePartition(sc *stmtctx.StatementContext, v types.Datum) (ListPartitionLocation, error) {
// To deal with the location partition failure caused by inconsistent NewCollationEnabled values(see issue #32416).
err := lp.buildPartitionValueMapAndSorted()
if err != nil {
return nil, err
}

key, err := lp.genKey(sc, v)
if err != nil {
return nil, errors.Trace(err)
}
lp.mu.RLock()
location, ok := lp.valueMap[string(key)]
lp.mu.RUnlock()
if !ok {
return nil, nil
}
Expand All @@ -851,13 +852,8 @@ func (lp *ForListColumnPruning) LocatePartition(sc *stmtctx.StatementContext, v

// LocateRanges locates partition ranges by the column range
func (lp *ForListColumnPruning) LocateRanges(sc *stmtctx.StatementContext, r *ranger.Range) ([]ListPartitionLocation, error) {
// To deal with the location partition failure caused by inconsistent NewCollationEnabled values(see issue #32416).
err := lp.buildPartitionValueMapAndSorted()
if err != nil {
return nil, err
}

var lowKey, highKey []byte
var err error
lowVal := r.LowVal[0]
if r.LowVal[0].Kind() == types.KindMinNotNull {
lowVal = types.GetMinValue(lp.ExprCol.GetType())
Expand Down

0 comments on commit fb32d69

Please sign in to comment.