diff --git a/executor/write_test.go b/executor/write_test.go index eb444a305b7a1..033798c754a82 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -1815,8 +1815,7 @@ func (s *testBypassSuite) TestBypassLatch(c *C) { // txn1 and txn2 data range do not overlap, but using latches result in txn conflict. fn() - _, err = tk1.Exec("commit") - c.Assert(err, NotNil) + tk1.MustExec("commit") tk1.MustExec("truncate table t") fn() diff --git a/store/tikv/latch/latch.go b/store/tikv/latch/latch.go index a8ca718947490..f6f57a460b106 100644 --- a/store/tikv/latch/latch.go +++ b/store/tikv/latch/latch.go @@ -14,6 +14,7 @@ package latch import ( + "bytes" "math/bits" "sort" "sync" @@ -22,32 +23,26 @@ import ( "github.com/spaolacci/murmur3" ) -// latch stores a key's waiting transactions information. -type latch struct { - // Whether there is any transaction in waitingQueue except head. - hasMoreWaiting bool - // The startTS of the transaction which is the head of waiting transactions. - waitingQueueHead uint64 - maxCommitTS uint64 - sync.Mutex -} +type node struct { + slotID int + key []byte + maxCommitTS uint64 + value *Lock -func (l *latch) isEmpty() bool { - return l.waitingQueueHead == 0 && !l.hasMoreWaiting + next *node } -func (l *latch) free() { - l.waitingQueueHead = 0 -} - -func (l *latch) refreshCommitTS(commitTS uint64) { - l.Lock() - defer l.Unlock() - l.maxCommitTS = mathutil.MaxUint64(commitTS, l.maxCommitTS) +// latch stores a key's waiting transactions information. +type latch struct { + queue *node + count int + waiting []*Lock + sync.Mutex } // Lock is the locks' information required for a transaction. type Lock struct { + keys [][]byte // The slot IDs of the latches(keys) that a startTS must acquire before being able to processed. requiredSlots []int // The number of latches that the transaction has acquired. For status is stale, it include the @@ -96,9 +91,20 @@ func (l *Lock) SetCommitTS(commitTS uint64) { // but conceptually a latch is a queue, and a slot is an index to the queue type Latches struct { slots []latch - // The waiting queue for each slot(slotID => slice of Lock). - waitingQueues map[int][]*Lock - sync.RWMutex +} + +type bytesSlice [][]byte + +func (s bytesSlice) Len() int { + return len(s) +} + +func (s bytesSlice) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func (s bytesSlice) Less(i, j int) bool { + return bytes.Compare(s[i], s[j]) < 0 } // NewLatches create a Latches with fixed length, @@ -107,14 +113,15 @@ func NewLatches(size uint) *Latches { powerOfTwoSize := 1 << uint32(bits.Len32(uint32(size-1))) slots := make([]latch, powerOfTwoSize) return &Latches{ - slots: slots, - waitingQueues: make(map[int][]*Lock), + slots: slots, } } // genLock generates Lock for the transaction with startTS and keys. func (latches *Latches) genLock(startTS uint64, keys [][]byte) *Lock { + sort.Sort(bytesSlice(keys)) return &Lock{ + keys: keys, requiredSlots: latches.genSlotIDs(keys), acquiredCount: 0, startTS: startTS, @@ -126,17 +133,7 @@ func (latches *Latches) genSlotIDs(keys [][]byte) []int { for _, key := range keys { slots = append(slots, latches.slotID(key)) } - sort.Ints(slots) - if len(slots) <= 1 { - return slots - } - dedup := slots[:1] - for i := 1; i < len(slots); i++ { - if slots[i] != slots[i-1] { - dedup = append(dedup, slots[i]) - } - } - return dedup + return slots } // slotID return slotID for current key. @@ -150,8 +147,7 @@ func (latches *Latches) acquire(lock *Lock) acquireResult { return acquireStale } for lock.acquiredCount < len(lock.requiredSlots) { - slotID := lock.requiredSlots[lock.acquiredCount] - status := latches.acquireSlot(slotID, lock) + status := latches.acquireSlot(lock) if status != acquireSuccess { return status } @@ -161,75 +157,129 @@ func (latches *Latches) acquire(lock *Lock) acquireResult { // release releases all latches owned by the `lock` and returns the wakeup list. // Preconditions: the caller must ensure the transaction's status is not locked. -func (latches *Latches) release(lock *Lock, commitTS uint64, wakeupList []*Lock) []*Lock { +func (latches *Latches) release(lock *Lock, wakeupList []*Lock) []*Lock { wakeupList = wakeupList[:0] - for i := 0; i < lock.acquiredCount; i++ { - slotID := lock.requiredSlots[i] - if nextLock := latches.releaseSlot(slotID, commitTS); nextLock != nil { + for lock.acquiredCount > 0 { + if nextLock := latches.releaseSlot(lock); nextLock != nil { wakeupList = append(wakeupList, nextLock) } } return wakeupList } -// refreshCommitTS refreshes commitTS for keys. -func (latches *Latches) refreshCommitTS(keys [][]byte, commitTS uint64) { - slotIDs := latches.genSlotIDs(keys) - for _, slotID := range slotIDs { - latches.slots[slotID].refreshCommitTS(commitTS) - } -} - -func (latches *Latches) releaseSlot(slotID int, commitTS uint64) (nextLock *Lock) { +func (latches *Latches) releaseSlot(lock *Lock) (nextLock *Lock) { + key := lock.keys[lock.acquiredCount-1] + slotID := lock.requiredSlots[lock.acquiredCount-1] latch := &latches.slots[slotID] + lock.acquiredCount-- latch.Lock() defer latch.Unlock() - latch.maxCommitTS = mathutil.MaxUint64(latch.maxCommitTS, commitTS) - if !latch.hasMoreWaiting { - latch.free() + + find := findNode(latch.queue, key) + if find.value != lock { + panic("releaseSlot wrong") + } + find.maxCommitTS = mathutil.MaxUint64(find.maxCommitTS, lock.commitTS) + find.value = nil + if len(latch.waiting) == 0 { return nil } - nextLock, latch.hasMoreWaiting = latches.popFromWaitingQueue(slotID) - latch.waitingQueueHead = nextLock.startTS - nextLock.acquiredCount++ - if latch.maxCommitTS > nextLock.startTS { - nextLock.isStale = true + + var idx int + for idx = 0; idx < len(latch.waiting); idx++ { + waiting := latch.waiting[idx] + if bytes.Compare(waiting.keys[waiting.acquiredCount], key) == 0 { + break + } } - return nextLock -} + // Wake up the first one in waiting queue. + if idx < len(latch.waiting) { + nextLock = latch.waiting[idx] + // Delete element latch.waiting[idx] from the array. + copy(latch.waiting[idx:], latch.waiting[idx+1:]) + latch.waiting[len(latch.waiting)-1] = nil + latch.waiting = latch.waiting[:len(latch.waiting)-1] -func (latches *Latches) popFromWaitingQueue(slotID int) (front *Lock, hasMoreWaiting bool) { - latches.Lock() - defer latches.Unlock() - waiting := latches.waitingQueues[slotID] - front = waiting[0] - if len(waiting) == 1 { - delete(latches.waitingQueues, slotID) - } else { - latches.waitingQueues[slotID] = waiting[1:] - hasMoreWaiting = true + if find.maxCommitTS > nextLock.startTS { + nextLock.isStale = true + } } + return } -func (latches *Latches) acquireSlot(slotID int, lock *Lock) acquireResult { +func (latches *Latches) acquireSlot(lock *Lock) acquireResult { + key := lock.keys[lock.acquiredCount] + slotID := lock.requiredSlots[lock.acquiredCount] latch := &latches.slots[slotID] latch.Lock() defer latch.Unlock() - if latch.maxCommitTS > lock.startTS { + + // Try to recycle to limit the memory usage. + if latch.count >= latchListCount { + latch.recycle(lock.startTS) + } + + find := findNode(latch.queue, key) + if find == nil { + tmp := &node{ + slotID: slotID, + key: key, + value: lock, + } + tmp.next = latch.queue + latch.queue = tmp + latch.count++ + + lock.acquiredCount++ + return acquireSuccess + } + + if find.maxCommitTS > lock.startTS { lock.isStale = true return acquireStale } - if latch.isEmpty() { - latch.waitingQueueHead = lock.startTS + if find.value == nil { + find.value = lock lock.acquiredCount++ return acquireSuccess } + // Push the current transaction into waitingQueue. - latch.hasMoreWaiting = true - latches.Lock() - defer latches.Unlock() - latches.waitingQueues[slotID] = append(latches.waitingQueues[slotID], lock) + latch.waiting = append(latch.waiting, lock) return acquireLocked } + +// recycle is not thread safe, the latch should acquire its lock before executing this function. +func (l *latch) recycle(currentTS uint64) { + fakeHead := node{next: l.queue} + prev := &fakeHead + for curr := prev.next; curr != nil; curr = curr.next { + if tsoSub(currentTS, curr.maxCommitTS) >= expireDuration && curr.value == nil { + l.count-- + prev.next = curr.next + } else { + prev = curr + } + } + l.queue = fakeHead.next +} + +func (latches *Latches) recycle(currentTS uint64) { + for i := 0; i < len(latches.slots); i++ { + latch := &latches.slots[i] + latch.Lock() + latch.recycle(currentTS) + latch.Unlock() + } +} + +func findNode(list *node, key []byte) *node { + for n := list; n != nil; n = n.next { + if bytes.Compare(n.key, key) == 0 { + return n + } + } + return nil +} diff --git a/store/tikv/latch/latch_test.go b/store/tikv/latch/latch_test.go index 17178dd5c7850..951a9e3de1802 100644 --- a/store/tikv/latch/latch_test.go +++ b/store/tikv/latch/latch_test.go @@ -16,8 +16,10 @@ package latch import ( "sync/atomic" "testing" + "time" . "github.com/pingcap/check" + "github.com/pingcap/tidb/store/tikv/oracle" ) func TestT(t *testing.T) { @@ -48,7 +50,7 @@ func getTso() uint64 { func (s *testLatchSuite) TestWakeUp(c *C) { keysA := [][]byte{ - []byte("a"), []byte("b"), []byte("c"), []byte("c")} + []byte("a"), []byte("b"), []byte("c")} _, lockA := s.newLock(keysA) keysB := [][]byte{[]byte("d"), []byte("e"), []byte("a"), []byte("c")} @@ -65,7 +67,8 @@ func (s *testLatchSuite) TestWakeUp(c *C) { // A release lock, and get wakeup list. commitTSA := getTso() wakeupList := make([]*Lock, 0) - wakeupList = s.latches.release(lockA, commitTSA, wakeupList) + lockA.SetCommitTS(commitTSA) + wakeupList = s.latches.release(lockA, wakeupList) c.Assert(wakeupList[0].startTS, Equals, startTSB) // B acquire failed since startTSB has stale for some keys. @@ -73,7 +76,7 @@ func (s *testLatchSuite) TestWakeUp(c *C) { c.Assert(result, Equals, acquireStale) // B release lock since it received a stale. - wakeupList = s.latches.release(lockB, 0, wakeupList) + wakeupList = s.latches.release(lockB, wakeupList) c.Assert(wakeupList, HasLen, 0) // B restart:get a new startTS. @@ -85,7 +88,7 @@ func (s *testLatchSuite) TestWakeUp(c *C) { func (s *testLatchSuite) TestFirstAcquireFailedWithStale(c *C) { keys := [][]byte{ - []byte("a"), []byte("b"), []byte("c"), []byte("c")} + []byte("a"), []byte("b"), []byte("c")} _, lockA := s.newLock(keys) startTSB, lockB := s.newLock(keys) // acquire lockA success @@ -94,11 +97,53 @@ func (s *testLatchSuite) TestFirstAcquireFailedWithStale(c *C) { // release lockA commitTSA := getTso() wakeupList := make([]*Lock, 0) - s.latches.release(lockA, commitTSA, wakeupList) + lockA.SetCommitTS(commitTSA) + s.latches.release(lockA, wakeupList) c.Assert(commitTSA, Greater, startTSB) // acquire lockB first time, should be failed with stale since commitTSA > startTSB result = s.latches.acquire(lockB) c.Assert(result, Equals, acquireStale) - s.latches.release(lockB, 0, wakeupList) + s.latches.release(lockB, wakeupList) +} + +func (s *testLatchSuite) TestRecycle(c *C) { + latches := NewLatches(8) + now := time.Now() + startTS := oracle.ComposeTS(oracle.GetPhysical(now), 0) + lock := latches.genLock(startTS, [][]byte{ + []byte("a"), []byte("b"), + }) + lock1 := latches.genLock(startTS, [][]byte{ + []byte("b"), []byte("c"), + }) + c.Assert(latches.acquire(lock), Equals, acquireSuccess) + c.Assert(latches.acquire(lock1), Equals, acquireLocked) + lock.SetCommitTS(startTS + 1) + var wakeupList []*Lock + latches.release(lock, wakeupList) + + lock2 := latches.genLock(startTS+3, [][]byte{ + []byte("b"), []byte("c"), + }) + c.Assert(latches.acquire(lock2), Equals, acquireSuccess) + wakeupList = wakeupList[:0] + latches.release(lock2, wakeupList) + + allEmpty := true + for i := 0; i < len(latches.slots); i++ { + latch := &latches.slots[i] + if latch.queue != nil { + allEmpty = false + } + } + c.Assert(allEmpty, IsFalse) + + currentTS := oracle.ComposeTS(oracle.GetPhysical(now.Add(expireDuration)), 3) + latches.recycle(currentTS) + + for i := 0; i < len(latches.slots); i++ { + latch := &latches.slots[i] + c.Assert(latch.queue, IsNil) + } } diff --git a/store/tikv/latch/scheduler.go b/store/tikv/latch/scheduler.go index f3ffad7a77d9f..5f4e84cd7760d 100644 --- a/store/tikv/latch/scheduler.go +++ b/store/tikv/latch/scheduler.go @@ -15,15 +15,19 @@ package latch import ( "sync" + "time" + + "github.com/pingcap/tidb/store/tikv/oracle" ) const lockChanSize = 100 // LatchesScheduler is used to schedule latches for transactions. type LatchesScheduler struct { - latches *Latches - unlockCh chan *Lock - closed bool + latches *Latches + unlockCh chan *Lock + closed bool + lastRecycleTime uint64 sync.RWMutex } @@ -40,13 +44,31 @@ func NewScheduler(size uint) *LatchesScheduler { return scheduler } +// A transaction can last for at most 10 minutes, see also gcworker. +const expireDuration = 10 * time.Minute +const checkInterval = 5 * time.Minute +const checkCounter = 50000 +const latchListCount = 5 + func (scheduler *LatchesScheduler) run() { + var counter int wakeupList := make([]*Lock, 0) for lock := range scheduler.unlockCh { - wakeupList = scheduler.latches.release(lock, lock.commitTS, wakeupList) + wakeupList = scheduler.latches.release(lock, wakeupList) if len(wakeupList) > 0 { scheduler.wakeup(wakeupList) } + + if lock.commitTS > lock.startTS { + currentTS := lock.commitTS + elapsed := tsoSub(currentTS, scheduler.lastRecycleTime) + if elapsed > checkInterval && counter > checkCounter { + go scheduler.latches.recycle(lock.commitTS) + scheduler.lastRecycleTime = currentTS + counter = 0 + } + } + counter++ } } @@ -92,8 +114,8 @@ func (scheduler *LatchesScheduler) UnLock(lock *Lock) { } } -// RefreshCommitTS refreshes commitTS for keys. It could be used for the transaction not retryable, -// which would do 2PC directly and wouldn't get a lock. -func (scheduler *LatchesScheduler) RefreshCommitTS(keys [][]byte, commitTS uint64) { - scheduler.latches.refreshCommitTS(keys, commitTS) +func tsoSub(ts1, ts2 uint64) time.Duration { + t1 := oracle.GetTimeFromTS(ts1) + t2 := oracle.GetTimeFromTS(ts2) + return t1.Sub(t2) } diff --git a/store/tikv/latch/scheduler_test.go b/store/tikv/latch/scheduler_test.go index d57737fb9512d..fc2d5b1514d0f 100644 --- a/store/tikv/latch/scheduler_test.go +++ b/store/tikv/latch/scheduler_test.go @@ -29,7 +29,7 @@ func (s *testSchedulerSuite) SetUpTest(c *C) { func (s *testSchedulerSuite) TestWithConcurrency(c *C) { txns := [][][]byte{ - {[]byte("a"), []byte("a"), []byte("b"), []byte("c")}, + {[]byte("a"), []byte("b"), []byte("c")}, {[]byte("a"), []byte("d"), []byte("e"), []byte("f")}, {[]byte("e"), []byte("f"), []byte("g"), []byte("h")}, } diff --git a/store/tikv/txn.go b/store/tikv/txn.go index b1edef7da5b97..18b7e1a17d31f 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -199,9 +199,6 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { // When bypassLatch flag is true, commit directly. if bypassLatch { err = committer.executeAndWriteFinishBinlog(ctx) - if err == nil { - txn.store.txnLatches.RefreshCommitTS(committer.keys, committer.commitTS) - } log.Debug("[kv]", connID, " txnLatches enabled while txn not retryable, 2pc directly:", err) return errors.Trace(err) }