Skip to content

Commit

Permalink
server: add 500ms retries to ReadIndex requests for l-reads
Browse files Browse the repository at this point in the history
It is second approach (with first being etcd-io#12762) to solve etcd-io#12680
  • Loading branch information
wpedrak committed Mar 16, 2021
1 parent 4b21e38 commit 1df6caf
Showing 1 changed file with 27 additions and 16 deletions.
43 changes: 27 additions & 16 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
// We should stop accepting new proposals if the gap growing to a certain point.
maxGapBetweenApplyAndCommitIndex = 5000
traceThreshold = 100 * time.Millisecond
readIndexRetryTime = 500 * time.Millisecond
)

type RaftKV interface {
Expand Down Expand Up @@ -768,19 +769,25 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{},
return 0, err
}

confirmedIndex, err := s.readReadIndexResponse(leaderChangedNotifier, requestId)
if err != nil {
return 0, err
}
return confirmedIndex, nil
}

func (s *EtcdServer) readReadIndexResponse(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) {
lg := s.Logger()
errorTimer := time.NewTimer(s.Cfg.ReqTimeout())
defer errorTimer.Stop()

for {
select {
case <-errorTimer.C:
lg.Warn(
"timed out waiting for read index response (local node might have slow network)",
zap.Duration("timeout", s.Cfg.ReqTimeout()),
)
slowReadIndex.Inc()
return 0, ErrTimeout
default:
}

select {
case rs := <-s.r.readStateC:
requestIdBytes := unit64ToBigEndianBytes(requestId)
requestIdBytes := uint64ToBigEndianBytes(requestId)
gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes)
if !gotOwnResponse {
// a previous request might time out. now we should ignore the response of it and
Expand All @@ -802,27 +809,31 @@ func (s *EtcdServer) readReadIndexResponse(leaderChangedNotifier <-chan struct{}
readIndexFailed.Inc()
// return a retryable error.
return 0, ErrLeaderChanged
case <-time.After(s.Cfg.ReqTimeout()):
case <-time.After(readIndexRetryTime):
lg.Warn(
"timed out waiting for read index response (local node might have slow network)",
zap.Duration("timeout", s.Cfg.ReqTimeout()),
"waiting for ReadIndex response took too long, retrying",
zap.Uint64("sent-request-id", requestId),
zap.Duration("retry-timeout", readIndexRetryTime),
)
slowReadIndex.Inc()
return 0, ErrTimeout
err := s.sendReadIndex(requestId)
if err != nil {
return 0, err
}
continue
case <-s.stopping:
return 0, ErrStopped
}
}
}

func unit64ToBigEndianBytes(number uint64) []byte {
func uint64ToBigEndianBytes(number uint64) []byte {
byteResult := make([]byte, 8)
binary.BigEndian.PutUint64(byteResult, number)
return byteResult
}

func (s *EtcdServer) sendReadIndex(requestIndex uint64) error {
ctxToSend := unit64ToBigEndianBytes(requestIndex)
ctxToSend := uint64ToBigEndianBytes(requestIndex)

cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
err := s.r.ReadIndex(cctx, ctxToSend)
Expand Down

0 comments on commit 1df6caf

Please sign in to comment.