Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix data race in oomtest #30339

Merged
merged 3 commits into from
Dec 2, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 56 additions & 28 deletions executor/oomtest/oom_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@ func TestMemTracker4UpdateExec(t *testing.T) {
tk.MustExec("create table t_MemTracker4UpdateExec (id int, a int, b int, index idx_a(`a`))")

log.SetLevel(zap.InfoLevel)
oom.tracker = ""

oom.SetTracker("")

tk.MustExec("insert into t_MemTracker4UpdateExec values (1,1,1), (2,2,2), (3,3,3)")
require.Equal(t, "schemaLeaseChecker is not set for this transaction", oom.tracker)
require.Equal(t, "schemaLeaseChecker is not set for this transaction", oom.GetTracker())

tk.Session().GetSessionVars().MemQuotaQuery = 244
tk.MustExec("update t_MemTracker4UpdateExec set a = 4")
require.Equal(t, "expensive_query during bootstrap phase", oom.tracker)
require.Equal(t, "expensive_query during bootstrap phase", oom.GetTracker())
}

func TestMemTracker4InsertAndReplaceExec(t *testing.T) {
Expand All @@ -77,54 +79,62 @@ func TestMemTracker4InsertAndReplaceExec(t *testing.T) {
tk.MustExec("create table t_MemTracker4InsertAndReplaceExec (id int, a int, b int, index idx_a(`a`))")

log.SetLevel(zap.InfoLevel)
oom.tracker = ""

oom.SetTracker("")

tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
require.Equal(t, "schemaLeaseChecker is not set for this transaction", oom.tracker)
require.Equal(t, "schemaLeaseChecker is not set for this transaction", oom.GetTracker())
tk.Session().GetSessionVars().MemQuotaQuery = 1
tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
require.Equal(t, "expensive_query during bootstrap phase", oom.tracker)
require.Equal(t, "expensive_query during bootstrap phase", oom.GetTracker())
tk.Session().GetSessionVars().MemQuotaQuery = -1

oom.tracker = ""
oom.SetTracker("")

tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
require.Equal(t, "", oom.tracker)
require.Equal(t, "", oom.GetTracker())
tk.Session().GetSessionVars().MemQuotaQuery = 1
tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
require.Equal(t, "expensive_query during bootstrap phase", oom.tracker)
require.Equal(t, "expensive_query during bootstrap phase", oom.GetTracker())
tk.Session().GetSessionVars().MemQuotaQuery = -1

oom.tracker = ""
oom.SetTracker("")

tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec select * from t")
require.Equal(t, "", oom.tracker)
require.Equal(t, "", oom.GetTracker())
tk.Session().GetSessionVars().MemQuotaQuery = 1
tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec select * from t")
require.Equal(t, "expensive_query during bootstrap phase", oom.tracker)
require.Equal(t, "expensive_query during bootstrap phase", oom.GetTracker())
tk.Session().GetSessionVars().MemQuotaQuery = -1

oom.tracker = ""
oom.SetTracker("")

tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec select * from t")
require.Equal(t, "", oom.tracker)
require.Equal(t, "", oom.GetTracker())
tk.Session().GetSessionVars().MemQuotaQuery = 1
tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec select * from t")
require.Equal(t, "expensive_query during bootstrap phase", oom.tracker)
require.Equal(t, "expensive_query during bootstrap phase", oom.GetTracker())
tk.Session().GetSessionVars().MemQuotaQuery = -1

tk.Session().GetSessionVars().DMLBatchSize = 1
tk.Session().GetSessionVars().BatchInsert = true
oom.tracker = ""

oom.SetTracker("")

tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
require.Equal(t, "", oom.tracker)
require.Equal(t, "", oom.GetTracker())
tk.Session().GetSessionVars().MemQuotaQuery = 1
tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
require.Equal(t, "expensive_query during bootstrap phase", oom.tracker)
require.Equal(t, "expensive_query during bootstrap phase", oom.GetTracker())
tk.Session().GetSessionVars().MemQuotaQuery = -1

oom.tracker = ""
oom.SetTracker("")

tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
require.Equal(t, "", oom.tracker)
require.Equal(t, "", oom.GetTracker())
tk.Session().GetSessionVars().MemQuotaQuery = 1
tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
require.Equal(t, "expensive_query during bootstrap phase", oom.tracker)
require.Equal(t, "expensive_query during bootstrap phase", oom.GetTracker())
tk.Session().GetSessionVars().MemQuotaQuery = -1
}

Expand All @@ -140,27 +150,33 @@ func TestMemTracker4DeleteExec(t *testing.T) {
// delete from single table
log.SetLevel(zap.InfoLevel)
tk.MustExec("insert into MemTracker4DeleteExec1 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)")
oom.tracker = ""

oom.SetTracker("")

tk.MustExec("delete from MemTracker4DeleteExec1")
require.Equal(t, "", oom.tracker)
require.Equal(t, "", oom.GetTracker())
tk.MustExec("insert into MemTracker4DeleteExec1 values (1,1,1), (2,2,2), (3,3,3)")
tk.Session().GetSessionVars().MemQuotaQuery = 1
tk.MustExec("delete from MemTracker4DeleteExec1")
require.Equal(t, "expensive_query during bootstrap phase", oom.tracker)
require.Equal(t, "expensive_query during bootstrap phase", oom.GetTracker())

// delete from multiple table
tk.Session().GetSessionVars().MemQuotaQuery = 100000
tk.MustExec("insert into MemTracker4DeleteExec1 values(1,1,1)")
tk.MustExec("insert into MemTracker4DeleteExec2 values(1,1,1)")
oom.tracker = ""

oom.SetTracker("")

tk.MustExec("delete MemTracker4DeleteExec1, MemTracker4DeleteExec2 from MemTracker4DeleteExec1 join MemTracker4DeleteExec2 on MemTracker4DeleteExec1.a=MemTracker4DeleteExec2.a")
require.Equal(t, "", oom.tracker)
require.Equal(t, "", oom.GetTracker())
tk.MustExec("insert into MemTracker4DeleteExec1 values(1,1,1)")
tk.MustExec("insert into MemTracker4DeleteExec2 values(1,1,1)")
oom.tracker = ""

oom.SetTracker("")

tk.Session().GetSessionVars().MemQuotaQuery = 10000
tk.MustExec("delete MemTracker4DeleteExec1, MemTracker4DeleteExec2 from MemTracker4DeleteExec1 join MemTracker4DeleteExec2 on MemTracker4DeleteExec1.a=MemTracker4DeleteExec2.a")
require.Equal(t, "memory exceeds quota, rateLimitAction delegate to fallback action", oom.tracker)
require.Equal(t, "memory exceeds quota, rateLimitAction delegate to fallback action", oom.GetTracker())
}

var oom *oomCapture
Expand All @@ -179,6 +195,18 @@ type oomCapture struct {
mu sync.Mutex
}

func (h *oomCapture) SetTracker(tracker string) {
h.mu.Lock()
defer h.mu.Unlock()
h.tracker = tracker
}

func (h *oomCapture) GetTracker() string {
h.mu.Lock()
defer h.mu.Unlock()
return h.tracker
}

func (h *oomCapture) Write(entry zapcore.Entry, fields []zapcore.Field) error {
if entry.Message == "memory exceeds quota" {
err, _ := fields[0].Interface.(error)
Expand Down