Skip to content

Commit

Permalink
server: wait for auto commit queries before shutdown (pingcap#55494)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKeao committed Aug 20, 2024
1 parent ed9aa18 commit c1c74b1
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 22 deletions.
15 changes: 10 additions & 5 deletions pkg/server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,17 +1126,22 @@ func (cc *clientConn) Run(ctx context.Context) {
return
}

// Should check InTxn() to avoid execute `begin` stmt.
// It should be CAS before checking the `inShutdownMode` to avoid the following scenario:
// 1. The connection checks the `inShutdownMode` and it's false.
// 2. The server sets the `inShutdownMode` to true. The `DrainClients` process ignores this connection
// because the connection is in the `connStatusReading` status.
// 3. The connection changes its status to `connStatusDispatching` and starts to execute the command.
if !cc.CompareAndSwapStatus(connStatusReading, connStatusDispatching) {
return
}

// Should check InTxn() to avoid execute `begin` stmt and allow executing statements in the not committed txn.
if cc.server.inShutdownMode.Load() {
if !cc.ctx.GetSessionVars().InTxn() {
return
}
}

if !cc.CompareAndSwapStatus(connStatusReading, connStatusDispatching) {
return
}

startTime := time.Now()
err = cc.dispatch(ctx, data)
cc.ctx.GetSessionVars().ClearAlloc(&cc.chunkAlloc, err != nil)
Expand Down
49 changes: 33 additions & 16 deletions pkg/server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,22 +784,39 @@ func TestShutDown(t *testing.T) {
cc = &clientConn{server: srv}
cc.SetCtx(tc)

// test in txn
srv.clients[dom.NextConnID()] = cc
cc.getCtx().GetSessionVars().SetInTxn(true)

waitTime := 100 * time.Millisecond
begin := time.Now()
srv.DrainClients(waitTime, waitTime)
require.Greater(t, time.Since(begin), waitTime)

// test not in txn
srv.clients[dom.NextConnID()] = cc
cc.getCtx().GetSessionVars().SetInTxn(false)

begin = time.Now()
srv.DrainClients(waitTime, waitTime)
require.Less(t, time.Since(begin), waitTime)
waitMap := [][]bool{
// Reading, Not Reading
{false, true}, // Not InTxn
{true, true}, // InTxn
}
for idx, waitMap := range waitMap {
inTxn := idx > 0
for idx, shouldWait := range waitMap {
reading := idx == 0
if inTxn {
cc.getCtx().GetSessionVars().SetInTxn(true)
} else {
cc.getCtx().GetSessionVars().SetInTxn(false)
}
if reading {
cc.CompareAndSwapStatus(cc.getStatus(), connStatusReading)
} else {
cc.CompareAndSwapStatus(cc.getStatus(), connStatusDispatching)
}

srv.clients[dom.NextConnID()] = cc

waitTime := 100 * time.Millisecond
begin := time.Now()
srv.DrainClients(waitTime, waitTime)

if shouldWait {
require.Greater(t, time.Since(begin), waitTime)
} else {
require.Less(t, time.Since(begin), waitTime)
}
}
}
}

type snapshotCache interface {
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,8 @@ func (s *Server) DrainClients(drainWait time.Duration, cancelWait time.Duration)
go func() {
defer close(allDone)
for _, conn := range conns {
if !conn.getCtx().GetSessionVars().InTxn() {
// Wait for the connections with explicit transaction or an executing auto-commit query.
if conn.getStatus() == connStatusReading && !conn.getCtx().GetSessionVars().InTxn() {
continue
}
select {
Expand Down

0 comments on commit c1c74b1

Please sign in to comment.