Skip to content

Commit

Permalink
refactored l-read loop in v3_server.go
Browse files Browse the repository at this point in the history
  • Loading branch information
wpedrak committed Mar 16, 2021
1 parent e599f4a commit 4b21e38
Showing 1 changed file with 94 additions and 62 deletions.
156 changes: 94 additions & 62 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,12 +706,8 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In
func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }

func (s *EtcdServer) linearizableReadLoop() {
var rs raft.ReadState

for {
ctxToSend := make([]byte, 8)
id1 := s.reqIDGen.Next()
binary.BigEndian.PutUint64(ctxToSend, id1)
requestId := s.reqIDGen.Next()
leaderChangedNotifier := s.LeaderChangedNotify()
select {
case <-leaderChangedNotifier:
Expand All @@ -724,89 +720,125 @@ func (s *EtcdServer) linearizableReadLoop() {
// as a single loop is can unlock multiple reads, it is not very useful
// to propagate the trace from Txn or Range.
trace := traceutil.New("linearizableReadLoop", s.Logger())
nextnr := newNotifier()

nextnr := newNotifier()
s.readMu.Lock()
nr := s.readNotifier
s.readNotifier = nextnr
s.readMu.Unlock()

lg := s.Logger()
cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
cancel()
if err == raft.ErrStopped {
return
}
lg.Warn("failed to get read index from Raft", zap.Error(err))
readIndexFailed.Inc()
confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId)
if isStopped(err) {
return
}
if err != nil {
nr.notify(err)
continue
}
cancel()

var (
timeout bool
done bool
)
for !timeout && !done {
select {
case rs = <-s.r.readStateC:
done = bytes.Equal(rs.RequestCtx, ctxToSend)
if !done {
// a previous request might time out. now we should ignore the response of it and
// continue waiting for the response of the current requests.
id2 := uint64(0)
if len(rs.RequestCtx) == 8 {
id2 = binary.BigEndian.Uint64(rs.RequestCtx)
}
lg.Warn(
"ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
zap.Uint64("sent-request-id", id1),
zap.Uint64("received-request-id", id2),
)
slowReadIndex.Inc()
}
case <-leaderChangedNotifier:
timeout = true
readIndexFailed.Inc()
// return a retryable error.
nr.notify(ErrLeaderChanged)
case <-time.After(s.Cfg.ReqTimeout()):
lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout()))
nr.notify(ErrTimeout)
timeout = true
slowReadIndex.Inc()
case <-s.stopping:
return
}
}
if !done {
continue
}
trace.Step("read index received")

index := rs.Index
trace.AddField(traceutil.Field{Key: "readStateIndex", Value: index})
trace.AddField(traceutil.Field{Key: "readStateIndex", Value: confirmedIndex})

ai := s.getAppliedIndex()
trace.AddField(traceutil.Field{Key: "appliedIndex", Value: strconv.FormatUint(ai, 10)})
appliedIndex := s.getAppliedIndex()
trace.AddField(traceutil.Field{Key: "appliedIndex", Value: strconv.FormatUint(appliedIndex, 10)})

if ai < index {
if appliedIndex < confirmedIndex {
select {
case <-s.applyWait.Wait(index):
case <-s.applyWait.Wait(confirmedIndex):
case <-s.stopping:
return
}
}
// unblock all l-reads requested at indices before rs.Index
// unblock all l-reads requested at indices before confirmedIndex
nr.notify(nil)
trace.Step("applied index is now lower than readState.Index")

trace.LogAllStepsIfLong(traceThreshold)
}
}

func isStopped(err error) bool {
return err == raft.ErrStopped || err == ErrStopped
}

func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) {
err := s.sendReadIndex(requestId)
if err != nil {
return 0, err
}

confirmedIndex, err := s.readReadIndexResponse(leaderChangedNotifier, requestId)
if err != nil {
return 0, err
}
return confirmedIndex, nil
}

func (s *EtcdServer) readReadIndexResponse(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) {
lg := s.Logger()
for {
select {
case rs := <-s.r.readStateC:
requestIdBytes := unit64ToBigEndianBytes(requestId)
gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes)
if !gotOwnResponse {
// a previous request might time out. now we should ignore the response of it and
// continue waiting for the response of the current requests.
responseId := uint64(0)
if len(rs.RequestCtx) == 8 {
responseId = binary.BigEndian.Uint64(rs.RequestCtx)
}
lg.Warn(
"ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
zap.Uint64("sent-request-id", requestId),
zap.Uint64("received-request-id", responseId),
)
slowReadIndex.Inc()
continue
}
return rs.Index, nil
case <-leaderChangedNotifier:
readIndexFailed.Inc()
// return a retryable error.
return 0, ErrLeaderChanged
case <-time.After(s.Cfg.ReqTimeout()):
lg.Warn(
"timed out waiting for read index response (local node might have slow network)",
zap.Duration("timeout", s.Cfg.ReqTimeout()),
)
slowReadIndex.Inc()
return 0, ErrTimeout
case <-s.stopping:
return 0, ErrStopped
}
}
}

func unit64ToBigEndianBytes(number uint64) []byte {
byteResult := make([]byte, 8)
binary.BigEndian.PutUint64(byteResult, number)
return byteResult
}

func (s *EtcdServer) sendReadIndex(requestIndex uint64) error {
ctxToSend := unit64ToBigEndianBytes(requestIndex)

cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
err := s.r.ReadIndex(cctx, ctxToSend)
cancel()
if err == raft.ErrStopped {
return err
}
if err != nil {
lg := s.Logger()
lg.Warn("failed to get read index from Raft", zap.Error(err))
readIndexFailed.Inc()
return err
}
return nil
}

func (s *EtcdServer) LinearizableReadNotify(ctx context.Context) error {
return s.linearizableReadNotify(ctx)
}
Expand Down

0 comments on commit 4b21e38

Please sign in to comment.