Skip to content

Commit

Permalink
txn: handle errors returned from Flush (#52890)
Browse files Browse the repository at this point in the history
close #52889
  • Loading branch information
ekexium authored Apr 28, 2024
1 parent 70a8253 commit 024fdd2
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 4 deletions.
18 changes: 17 additions & 1 deletion pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,10 @@ func (s *session) tryReplaceWriteConflictError(oldErr error) (newErr error) {
}
originErr := errors.Cause(oldErr)
inErr, _ := originErr.(*errors.Error)
args := inErr.Args()
// we don't want to modify the oldErr, so copy the args list
oldArgs := inErr.Args()
args := make([]any, len(oldArgs))
copy(args, oldArgs)
is := sessiontxn.GetTxnManager(s).GetTxnInfoSchema()
if is == nil {
return nil
Expand Down Expand Up @@ -2274,6 +2277,19 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec.
}

rs, err = s.Exec(ctx)

if se.txn.Valid() && se.txn.IsPipelined() {
// Pipelined-DMLs can return assertion errors and write conflicts here because they flush
// during execution, handle these errors as we would handle errors after a commit.
if err != nil {
err = se.handleAssertionFailure(ctx, err)
}
newErr := se.tryReplaceWriteConflictError(err)
if newErr != nil {
err = newErr
}
}

sessVars.TxnCtx.StatementCount++
if rs != nil {
if se.GetSessionVars().StmtCtx.IsExplainAnalyzeDML {
Expand Down
23 changes: 20 additions & 3 deletions tests/realtikvtest/pipelineddmltest/pipelineddml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,26 +575,43 @@ func TestPipelinedDMLCommitSkipSecondaries(t *testing.T) {
}

func TestPipelinedDMLDisableRetry(t *testing.T) {
// the case tests that
// 1. auto-retry for pipelined dml is disabled
// 2. the write conflict error message returned from a Flush (instead of from Commit) is correct
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushKeys", `return(1)`))
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushSize", `return(1)`))
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBForceFlushSizeThreshold", `return(1)`))
defer func() {
require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBMinFlushKeys"))
require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBMinFlushSize"))
require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBForceFlushSizeThreshold"))
}()
store := realtikvtest.CreateMockStoreAndSetup(t)
tk1 := testkit.NewTestKit(t, store)
tk1.Session().GetSessionVars().InitChunkSize = 1
tk1.Session().GetSessionVars().MaxChunkSize = 1
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk2.MustExec("use test")
tk1.MustExec("drop table if exists t1")
tk1.MustExec("create table t1(a int primary key, b int)")
tk1.MustExec("insert into t1 values(1, 1)")
// we need to avoid inserting *literals* into t, so let t2 be the source table.
tk1.MustExec("create table t2(a int, b int)")
tk1.MustExec("insert into t2 values (1, 1), (2, 1)")
require.Nil(t, failpoint.Enable("tikvclient/beforePipelinedFlush", `pause`))
tk1.MustExec("set session tidb_dml_type = bulk")
errCh := make(chan error)
go func() {
errCh <- tk1.ExecToErr("update t1 set b = b + 20")
// we expect that this stmt triggers 2 flushes, each containing only 1 row.
errCh <- tk1.ExecToErr("insert into t1 select * from t2 order by a")
}()
time.Sleep(500 * time.Millisecond)
tk2.MustExec("update t1 set b = b + 10")
tk2.MustExec("insert into t1 values (1,2)")
require.Nil(t, failpoint.Disable("tikvclient/beforePipelinedFlush"))
err := <-errCh
require.Error(t, err)
require.True(t, kv.ErrWriteConflict.Equal(err), fmt.Sprintf("error: %s", err))
require.ErrorContains(t, err, "tableName=test.t1")
}

func TestReplaceRowCheck(t *testing.T) {
Expand Down

0 comments on commit 024fdd2

Please sign in to comment.