diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 9fbefc104eea..187b62d71df8 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -44,6 +44,7 @@ const ( // We should stop accepting new proposals if the gap growing to a certain point. maxGapBetweenApplyAndCommitIndex = 5000 traceThreshold = 100 * time.Millisecond + readIndexRetryTime = 500 * time.Millisecond ) type RaftKV interface { @@ -768,19 +769,25 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, return 0, err } - confirmedIndex, err := s.readReadIndexResponse(leaderChangedNotifier, requestId) - if err != nil { - return 0, err - } - return confirmedIndex, nil -} - -func (s *EtcdServer) readReadIndexResponse(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) { lg := s.Logger() + errorTimer := time.NewTimer(s.Cfg.ReqTimeout()) + defer errorTimer.Stop() + for { + select { + case <-errorTimer.C: + lg.Warn( + "timed out waiting for read index response (local node might have slow network)", + zap.Duration("timeout", s.Cfg.ReqTimeout()), + ) + slowReadIndex.Inc() + return 0, ErrTimeout + default: + } + select { case rs := <-s.r.readStateC: - requestIdBytes := unit64ToBigEndianBytes(requestId) + requestIdBytes := uint64ToBigEndianBytes(requestId) gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes) if !gotOwnResponse { // a previous request might time out. now we should ignore the response of it and @@ -802,27 +809,31 @@ func (s *EtcdServer) readReadIndexResponse(leaderChangedNotifier <-chan struct{} readIndexFailed.Inc() // return a retryable error. return 0, ErrLeaderChanged - case <-time.After(s.Cfg.ReqTimeout()): + case <-time.After(readIndexRetryTime): lg.Warn( - "timed out waiting for read index response (local node might have slow network)", - zap.Duration("timeout", s.Cfg.ReqTimeout()), + "waiting for ReadIndex response took too long, retrying", + zap.Uint64("sent-request-id", requestId), + zap.Duration("retry-timeout", readIndexRetryTime), ) - slowReadIndex.Inc() - return 0, ErrTimeout + err := s.sendReadIndex(requestId) + if err != nil { + return 0, err + } + continue case <-s.stopping: return 0, ErrStopped } } } -func unit64ToBigEndianBytes(number uint64) []byte { +func uint64ToBigEndianBytes(number uint64) []byte { byteResult := make([]byte, 8) binary.BigEndian.PutUint64(byteResult, number) return byteResult } func (s *EtcdServer) sendReadIndex(requestIndex uint64) error { - ctxToSend := unit64ToBigEndianBytes(requestIndex) + ctxToSend := uint64ToBigEndianBytes(requestIndex) cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) err := s.r.ReadIndex(cctx, ctxToSend)