Skip to content

Commit

Permalink
tables: fix insert ignore on duplicate with dup prefix 2nd index (#25905
Browse files Browse the repository at this point in the history
)
  • Loading branch information
lysu committed Aug 23, 2021
1 parent dbc8017 commit 0e278a1
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 7 deletions.
2 changes: 1 addition & 1 deletion executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
if sc.DupKeyAsWarning {
// For `UPDATE IGNORE`/`INSERT IGNORE ON DUPLICATE KEY UPDATE`
// If the new handle or unique index exists, this will avoid to remove the record.
err = tables.CheckHandleOrUniqueKeyExistForUpdateIgnoreOrInsertOnDupIgnore(ctx, sctx, t, newHandle, newData, modified)
err = tables.CheckHandleOrUniqueKeyExistForUpdateIgnoreOrInsertOnDupIgnore(ctx, sctx, t, newHandle, newData, oldData, modified)
if err != nil {
if terr, ok := errors.Cause(err).(*terror.Error); sctx.GetSessionVars().StmtCtx.IgnoreNoPartition && ok && terr.Code() == errno.ErrNoPartitionForGivenValue {
return false, nil
Expand Down
6 changes: 6 additions & 0 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,12 @@ func (s *testSuite4) TestInsertIgnoreOnDup(c *C) {
tk.MustExec("insert into t7(col_335, col_336) values(7685969, 'alice'),(2002468, 'bob')")
tk.MustExec("insert ignore into t7(col_335, col_336) values(2002468, 'david') on duplicate key update col_335 = 7685969")
tk.MustQuery("select * from t7").Check(testkit.Rows("-3217641 7685969 alice", "-3217641 2002468 bob"))

tk.MustExec("drop table if exists t8")
tk.MustExec("CREATE TABLE `t8` (`col_70` varbinary(444) NOT NULL DEFAULT 'bezhs', PRIMARY KEY (`col_70`) clustered, UNIQUE KEY `idx_22` (`col_70`(1)))")
tk.MustExec("insert into t8 values('lldcxiyfjrqzgj')")
tk.MustExec("insert ignore into t8 values ( 'lalozlkdosasfklmflo' ) on duplicate key update col_70 = 'lyhohxtby'")
tk.MustQuery("select * from t8").Check(testkit.Rows("lyhohxtby"))
}

func (s *testSuite4) TestInsertSetWithDefault(c *C) {
Expand Down
32 changes: 26 additions & 6 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -1449,7 +1450,7 @@ func FindIndexByColName(t table.Table, name string) table.Index {

// CheckHandleOrUniqueKeyExistForUpdateIgnoreOrInsertOnDupIgnore check whether recordID key or unique index key exists. if not exists, return nil,
// otherwise return kv.ErrKeyExists error.
func CheckHandleOrUniqueKeyExistForUpdateIgnoreOrInsertOnDupIgnore(ctx context.Context, sctx sessionctx.Context, t table.Table, recordID kv.Handle, newRow []types.Datum, modified []bool) error {
func CheckHandleOrUniqueKeyExistForUpdateIgnoreOrInsertOnDupIgnore(ctx context.Context, sctx sessionctx.Context, t table.Table, recordID kv.Handle, newRow []types.Datum, oldRow []types.Datum, modified []bool) error {
physicalTableID := t.Meta().ID
idxs := t.Indices()
if pt, ok := t.(*partitionedTable); ok {
Expand Down Expand Up @@ -1482,20 +1483,39 @@ func CheckHandleOrUniqueKeyExistForUpdateIgnoreOrInsertOnDupIgnore(ctx context.C

// Check unique key exists.
{
shouldSkipIgnoreCheck := func(idx table.Index) bool {
shouldSkipIgnoreCheck := func(idx table.Index) (bool, error) {
if !IsIndexWritable(idx) || !idx.Meta().Unique || (t.Meta().IsCommonHandle && idx.Meta().Primary) {
return true
return true, nil
}
for _, c := range idx.Meta().Columns {
if modified[c.Offset] {
return false
if c.Length != types.UnspecifiedLength && (newRow[c.Offset].Kind() == types.KindString || newRow[c.Offset].Kind() == types.KindBytes) {
newCol := newRow[c.Offset].Clone()
tablecodec.TruncateIndexValue(newCol, c, t.Meta().Columns[c.Offset])
oldCol := oldRow[c.Offset].Clone()
tablecodec.TruncateIndexValue(oldCol, c, t.Meta().Columns[c.Offset])
// We should use binary collation to compare datum, otherwise the result will be incorrect.
newCol.SetCollation(charset.CollationBin)
cmp, err := newCol.CompareDatum(sctx.GetSessionVars().StmtCtx, oldCol)
if err != nil {
return false, errors.Trace(err)
}
if cmp == 0 {
continue
}
}
return false, nil
}
}
return true
return true, nil
}

for _, idx := range idxs {
if shouldSkipIgnoreCheck(idx) {
skip, err := shouldSkipIgnoreCheck(idx)
if err != nil {
return err
}
if skip {
continue
}
newVals, err := idx.FetchValues(newRow, nil)
Expand Down

0 comments on commit 0e278a1

Please sign in to comment.