Skip to content

Commit

Permalink
test: stablize p-dml test & handle cache table when init store (#52093)
Browse files Browse the repository at this point in the history
ref #50215, close #52092
  • Loading branch information
you06 committed Mar 28, 2024
1 parent 7a2f1cc commit 04c4506
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 65 deletions.
65 changes: 0 additions & 65 deletions tests/realtikvtest/pipelineddmltest/pipelineddml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,71 +568,6 @@ func TestPipelinedDMLCommitSkipSecondaries(t *testing.T) {
tk.MustQuery("select count(1) from t").Check(testkit.Rows("200"))
}

func TestPipelinedDMLInsertMemoryTest(t *testing.T) {
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushKeys", `return(10)`))
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushSize", `return(128)`))
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBForceFlushSizeThreshold", `return(128)`))
defer func() {
require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBMinFlushKeys"))
require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBMinFlushSize"))
require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBForceFlushSizeThreshold"))
}()

store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, _t1")
tk.MustExec("create table t1 (a int, b int, c varchar(128), unique index idx(b))")
tk.MustExec("create table _t1 like t1")
cnt := 1000

// insertStmt
buf := bytes.NewBuffer(make([]byte, 0, 10240))
buf.WriteString("insert into t1 values ")
for i := 0; i < cnt; i++ {
if i > 0 {
buf.WriteString(", ")
}
buf.WriteString(fmt.Sprintf("(%d, %d, 'abcdefghijklmnopqrstuvwxyz1234567890,.?+-=_!@#$&*()_+')", i, i))
}
tk.MustExec(buf.String())
tk.MustQuery("select count(*) from t1").Check(testkit.Rows(fmt.Sprintf("%d", cnt)))

// insert
tk.MustExec("set global tidb_mem_oom_action = 'CANCEL'") // query canceled by memory controller will return error.
tk.MustExec("set session tidb_mem_quota_query = 256 << 10") // 256KB limitation.
tk.MustExec("set session tidb_max_chunk_size = 32")
insertStmt := "insert into _t1 select * from t1"
err := tk.ExecToErr(insertStmt)
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "Your query has been cancelled due to exceeding the allowed memory limit for a single SQL query. Please try narrowing your query scope or increase the tidb_mem_quota_query limit and try again."), err.Error())
tk.MustExec("set session tidb_dml_type = bulk")
tk.MustExec(insertStmt)
tk.MustQuery("select count(*) from _t1").Check(testkit.Rows(fmt.Sprintf("%d", cnt)))

// update
tk.MustExec("set session tidb_mem_quota_query = 256 << 10") // 256KB limitation.
updateStmt := "update _t1 set c = 'abcdefghijklmnopqrstuvwxyz1234567890,.?+-=_!@#$&*()_++++++'"
tk.MustExec("set session tidb_dml_type = standard")
err = tk.ExecToErr(updateStmt)
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "Your query has been cancelled due to exceeding the allowed memory limit for a single SQL query. Please try narrowing your query scope or increase the tidb_mem_quota_query limit and try again."), err.Error())
tk.MustExec("set session tidb_dml_type = bulk")
tk.MustExec(updateStmt)
tk.MustQuery("select count(*) from _t1 where c = 'abcdefghijklmnopqrstuvwxyz1234567890,.?+-=_!@#$&*()_++++++'").Check(testkit.Rows(fmt.Sprintf("%d", cnt)))

// delete
tk.MustExec("set session tidb_mem_quota_query = 128 << 10") // 128KB limitation.
deleteStmt := "delete from _t1"
tk.MustExec("set session tidb_dml_type = standard")
err = tk.ExecToErr(deleteStmt)
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "Your query has been cancelled due to exceeding the allowed memory limit for a single SQL query. Please try narrowing your query scope or increase the tidb_mem_quota_query limit and try again."), err.Error())
tk.MustExec("set session tidb_dml_type = bulk")
tk.MustExec(deleteStmt)
tk.MustQuery("select count(*) from _t1").Check(testkit.Rows("0"))
}

func TestPipelinedDMLDisableRetry(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk1 := testkit.NewTestKit(t, store)
Expand Down
3 changes: 3 additions & 0 deletions tests/realtikvtest/testkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ func CreateMockStoreAndDomainAndSetup(t *testing.T, opts ...mockstore.MockTiKVSt
for _, row := range rs.Rows() {
tables = append(tables, fmt.Sprintf("`%v`", row[0]))
}
for _, table := range tables {
tk.MustExec(fmt.Sprintf("alter table %s nocache", table))
}
if len(tables) > 0 {
tk.MustExec(fmt.Sprintf("drop table %s", strings.Join(tables, ",")))
}
Expand Down

0 comments on commit 04c4506

Please sign in to comment.