Skip to content

Commit

Permalink
fix the race condition between goroutine and channel on the same leas…
Browse files Browse the repository at this point in the history
…es to be revoked
  • Loading branch information
ahrtr committed May 24, 2022
1 parent c3bc411 commit 112a5da
Showing 1 changed file with 29 additions and 26 deletions.
55 changes: 29 additions & 26 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,33 +842,36 @@ func (s *EtcdServer) run() {
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f)
case leases := <-expiredLeaseC:
s.GoAttach(func() {
// Increases throughput of expired leases deletion process through parallelization
c := make(chan struct{}, maxPendingRevokes)
for _, lease := range leases {
select {
case c <- struct{}{}:
case <-s.stopping:
return
}
lid := lease.ID
s.GoAttach(func() {
ctx := s.authStore.WithRoot(s.ctx)
_, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
if lerr == nil {
leaseExpired.Inc()
} else {
lg.Warn(
"failed to revoke lease",
zap.String("lease-id", fmt.Sprintf("%016x", lid)),
zap.Error(lerr),
)
f := func(leases []*lease.Lease) {
s.GoAttach(func() {
// Increases throughput of expired leases deletion process through parallelization
c := make(chan struct{}, maxPendingRevokes)
for _, lease := range leases {
select {
case c <- struct{}{}:
case <-s.stopping:
return
}

<-c
})
}
})
lid := lease.ID
s.GoAttach(func() {
ctx := s.authStore.WithRoot(s.ctx)
_, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
if lerr == nil {
leaseExpired.Inc()
} else {
lg.Warn(
"failed to revoke lease",
zap.String("lease-id", fmt.Sprintf("%016x", lid)),
zap.Error(lerr),
)
}

<-c
})
}
})
}
f(leases)
case err := <-s.errorc:
lg.Warn("server error", zap.Error(err))
lg.Warn("data-dir used by this member must be removed")
Expand Down

0 comments on commit 112a5da

Please sign in to comment.