Skip to content

Commit

Permalink
Cherry pick #566 and #585 to tidb-6.1 (#599)
Browse files Browse the repository at this point in the history
* Clear intersecting regions in the cache when inserting a region (#566)

Signed-off-by: Yilin Chen <sticnarf@gmail.com>

* Acquire read lock in LockKeys to avoid data race (#585)

Signed-off-by: Yilin Chen <sticnarf@gmail.com>

Signed-off-by: Yilin Chen <sticnarf@gmail.com>

Signed-off-by: Yilin Chen <sticnarf@gmail.com>
  • Loading branch information
sticnarf committed Oct 12, 2022
1 parent ee63982 commit cc4fbe7
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 1 deletion.
27 changes: 27 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,33 @@ func (c *RegionCache) insertRegionToCache(cachedRegion *Region) {
if !ok || latest.GetVer() < newVer.GetVer() || latest.GetConfVer() < newVer.GetConfVer() {
c.mu.latestVersions[cachedRegion.VerID().id] = newVer
}
// The intersecting regions in the cache are probably stale, clear them.
deleted := c.removeIntersecting(cachedRegion)
for _, r := range deleted {
c.removeVersionFromCache(r.cachedRegion.VerID(), r.cachedRegion.GetID())
}
}

// removeIntersecting removes all items that have intersection with the key range of given region.
// If the region itself is in the cache, it's not removed.
func (c *RegionCache) removeIntersecting(r *Region) []*btreeItem {
var deleted []*btreeItem
c.mu.sorted.AscendGreaterOrEqual(newBtreeSearchItem(r.StartKey()), func(item_ btree.Item) bool {
item := item_.(*btreeItem)
// Skip the item that is equal to the given region.
if item.cachedRegion.VerID() == r.VerID() {
return true
}
if len(r.EndKey()) > 0 && bytes.Compare(item.cachedRegion.StartKey(), r.EndKey()) >= 0 {
return false
}
deleted = append(deleted, item)
return true
})
for _, item := range deleted {
c.mu.sorted.Delete(item)
}
return deleted
}

// searchCachedRegion finds a region from cache by key. Like `getCachedRegion`,
Expand Down
56 changes: 56 additions & 0 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1549,3 +1549,59 @@ func (s *testRegionCacheSuite) TestLocateBucket() {
s.True(b.Contains(key))
}
}

func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
// Split at "b", "c", "d", "e"
regions := s.cluster.AllocIDs(4)
regions = append([]uint64{s.region1}, regions...)

peers := [][]uint64{{s.peer1, s.peer2}}
for i := 0; i < 4; i++ {
peers = append(peers, s.cluster.AllocIDs(2))
}

for i := 0; i < 4; i++ {
s.cluster.Split(regions[i], regions[i+1], []byte{'b' + byte(i)}, peers[i+1], peers[i+1][0])
}

for c := 'a'; c <= 'e'; c++ {
loc, err := s.cache.LocateKey(s.bo, []byte{byte(c)})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[c-'a'])
}

// merge all except the last region together
for i := 1; i <= 3; i++ {
s.cluster.Merge(regions[0], regions[i])
}

// Now the region cache contains stale information
loc, err := s.cache.LocateKey(s.bo, []byte{'c'})
s.Nil(err)
s.NotEqual(loc.Region.GetID(), regions[0]) // This is incorrect, but is expected
loc, err = s.cache.LocateKey(s.bo, []byte{'e'})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[4]) // 'e' is not merged yet, so it's still correct

// If we insert the new region into the cache, the old intersecting regions will be removed.
// And the result will be correct.
region, err := s.cache.loadRegion(s.bo, []byte("c"), false)
s.Nil(err)
s.Equal(region.GetID(), regions[0])
s.cache.insertRegionToCache(region)
loc, err = s.cache.LocateKey(s.bo, []byte{'c'})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[0])
s.checkCache(2)

// Now, we merge the last region. This case tests against how we handle the empty end_key.
s.cluster.Merge(regions[0], regions[4])
region, err = s.cache.loadRegion(s.bo, []byte("e"), false)
s.Nil(err)
s.Equal(region.GetID(), regions[0])
s.cache.insertRegionToCache(region)
loc, err = s.cache.LocateKey(s.bo, []byte{'e'})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[0])
s.checkCache(1)
}
2 changes: 2 additions & 0 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,11 @@ func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, err
}

func (c *twoPhaseCommitter) extractKeyExistsErr(err *tikverr.ErrKeyExist) error {
c.txn.GetMemBuffer().RLock()
if !c.txn.us.HasPresumeKeyNotExists(err.GetKey()) {
return errors.Errorf("session %d, existErr for key:%s should not be nil", c.sessionID, err.GetKey())
}
c.txn.GetMemBuffer().RUnlock()
return errors.WithStack(err)
}

Expand Down
2 changes: 2 additions & 0 deletions txnkv/transaction/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (actionPessimisticRollback) tiKVTxnRegionsNumHistogram() prometheus.Observe
func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
m := batch.mutations
mutations := make([]*kvrpcpb.Mutation, m.Len())
c.txn.GetMemBuffer().RLock()
for i := 0; i < m.Len(); i++ {
mut := &kvrpcpb.Mutation{
Op: kvrpcpb.Op_PessimisticLock,
Expand All @@ -96,6 +97,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
}
mutations[i] = mut
}
c.txn.GetMemBuffer().RUnlock()
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticLock, &kvrpcpb.PessimisticLockRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
Expand Down
15 changes: 14 additions & 1 deletion txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,10 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
}
}
}()

memBuf := txn.us.GetMemBuffer()
// Avoid data race with concurrent updates to the memBuf
memBuf.RLock()
for _, key := range keysInput {
// The value of lockedMap is only used by pessimistic transactions.
var valueExist, locked, checkKeyExists bool
Expand All @@ -627,6 +630,7 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
if checkKeyExists && valueExist {
alreadyExist := kvrpcpb.AlreadyExist{Key: key}
e := &tikverr.ErrKeyExist{AlreadyExist: &alreadyExist}
memBuf.RUnlock()
return txn.committer.extractKeyExistsErr(e)
}
}
Expand All @@ -636,6 +640,8 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
lockCtx.Values[string(key)] = tikv.ReturnedValue{AlreadyLocked: true}
}
}
memBuf.RUnlock()

if len(keys) == 0 {
return nil
}
Expand Down Expand Up @@ -683,11 +689,18 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
atomic.CompareAndSwapUint32(lockCtx.Killed, 1, 0)
}
if err != nil {
var unmarkKeys [][]byte
// Avoid data race with concurrent updates to the memBuf
memBuf.RLock()
for _, key := range keys {
if txn.us.HasPresumeKeyNotExists(key) {
txn.us.UnmarkPresumeKeyNotExists(key)
unmarkKeys = append(unmarkKeys, key)
}
}
memBuf.RUnlock()
for _, key := range unmarkKeys {
txn.us.UnmarkPresumeKeyNotExists(key)
}
keyMayBeLocked := !(tikverr.IsErrWriteConflict(err) || tikverr.IsErrKeyExist(err))
// If there is only 1 key and lock fails, no need to do pessimistic rollback.
if len(keys) > 1 || keyMayBeLocked {
Expand Down

0 comments on commit cc4fbe7

Please sign in to comment.