From 4b21e3838116d15490530d497fbd617131af96f6 Mon Sep 17 00:00:00 2001 From: wpedrak Date: Mon, 15 Mar 2021 14:26:25 +0100 Subject: [PATCH] refactored l-read loop in v3_server.go --- server/etcdserver/v3_server.go | 156 ++++++++++++++++++++------------- 1 file changed, 94 insertions(+), 62 deletions(-) diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 45d79948c59..9fbefc104ee 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -706,12 +706,8 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() } func (s *EtcdServer) linearizableReadLoop() { - var rs raft.ReadState - for { - ctxToSend := make([]byte, 8) - id1 := s.reqIDGen.Next() - binary.BigEndian.PutUint64(ctxToSend, id1) + requestId := s.reqIDGen.Next() leaderChangedNotifier := s.LeaderChangedNotify() select { case <-leaderChangedNotifier: @@ -724,82 +720,37 @@ func (s *EtcdServer) linearizableReadLoop() { // as a single loop is can unlock multiple reads, it is not very useful // to propagate the trace from Txn or Range. trace := traceutil.New("linearizableReadLoop", s.Logger()) - nextnr := newNotifier() + nextnr := newNotifier() s.readMu.Lock() nr := s.readNotifier s.readNotifier = nextnr s.readMu.Unlock() - lg := s.Logger() - cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) - if err := s.r.ReadIndex(cctx, ctxToSend); err != nil { - cancel() - if err == raft.ErrStopped { - return - } - lg.Warn("failed to get read index from Raft", zap.Error(err)) - readIndexFailed.Inc() + confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId) + if isStopped(err) { + return + } + if err != nil { nr.notify(err) continue } - cancel() - var ( - timeout bool - done bool - ) - for !timeout && !done { - select { - case rs = <-s.r.readStateC: - done = bytes.Equal(rs.RequestCtx, ctxToSend) - if !done { - // a previous request might time out. now we should ignore the response of it and - // continue waiting for the response of the current requests. - id2 := uint64(0) - if len(rs.RequestCtx) == 8 { - id2 = binary.BigEndian.Uint64(rs.RequestCtx) - } - lg.Warn( - "ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader", - zap.Uint64("sent-request-id", id1), - zap.Uint64("received-request-id", id2), - ) - slowReadIndex.Inc() - } - case <-leaderChangedNotifier: - timeout = true - readIndexFailed.Inc() - // return a retryable error. - nr.notify(ErrLeaderChanged) - case <-time.After(s.Cfg.ReqTimeout()): - lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout())) - nr.notify(ErrTimeout) - timeout = true - slowReadIndex.Inc() - case <-s.stopping: - return - } - } - if !done { - continue - } trace.Step("read index received") - index := rs.Index - trace.AddField(traceutil.Field{Key: "readStateIndex", Value: index}) + trace.AddField(traceutil.Field{Key: "readStateIndex", Value: confirmedIndex}) - ai := s.getAppliedIndex() - trace.AddField(traceutil.Field{Key: "appliedIndex", Value: strconv.FormatUint(ai, 10)}) + appliedIndex := s.getAppliedIndex() + trace.AddField(traceutil.Field{Key: "appliedIndex", Value: strconv.FormatUint(appliedIndex, 10)}) - if ai < index { + if appliedIndex < confirmedIndex { select { - case <-s.applyWait.Wait(index): + case <-s.applyWait.Wait(confirmedIndex): case <-s.stopping: return } } - // unblock all l-reads requested at indices before rs.Index + // unblock all l-reads requested at indices before confirmedIndex nr.notify(nil) trace.Step("applied index is now lower than readState.Index") @@ -807,6 +758,87 @@ func (s *EtcdServer) linearizableReadLoop() { } } +func isStopped(err error) bool { + return err == raft.ErrStopped || err == ErrStopped +} + +func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) { + err := s.sendReadIndex(requestId) + if err != nil { + return 0, err + } + + confirmedIndex, err := s.readReadIndexResponse(leaderChangedNotifier, requestId) + if err != nil { + return 0, err + } + return confirmedIndex, nil +} + +func (s *EtcdServer) readReadIndexResponse(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) { + lg := s.Logger() + for { + select { + case rs := <-s.r.readStateC: + requestIdBytes := unit64ToBigEndianBytes(requestId) + gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes) + if !gotOwnResponse { + // a previous request might time out. now we should ignore the response of it and + // continue waiting for the response of the current requests. + responseId := uint64(0) + if len(rs.RequestCtx) == 8 { + responseId = binary.BigEndian.Uint64(rs.RequestCtx) + } + lg.Warn( + "ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader", + zap.Uint64("sent-request-id", requestId), + zap.Uint64("received-request-id", responseId), + ) + slowReadIndex.Inc() + continue + } + return rs.Index, nil + case <-leaderChangedNotifier: + readIndexFailed.Inc() + // return a retryable error. + return 0, ErrLeaderChanged + case <-time.After(s.Cfg.ReqTimeout()): + lg.Warn( + "timed out waiting for read index response (local node might have slow network)", + zap.Duration("timeout", s.Cfg.ReqTimeout()), + ) + slowReadIndex.Inc() + return 0, ErrTimeout + case <-s.stopping: + return 0, ErrStopped + } + } +} + +func unit64ToBigEndianBytes(number uint64) []byte { + byteResult := make([]byte, 8) + binary.BigEndian.PutUint64(byteResult, number) + return byteResult +} + +func (s *EtcdServer) sendReadIndex(requestIndex uint64) error { + ctxToSend := unit64ToBigEndianBytes(requestIndex) + + cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) + err := s.r.ReadIndex(cctx, ctxToSend) + cancel() + if err == raft.ErrStopped { + return err + } + if err != nil { + lg := s.Logger() + lg.Warn("failed to get read index from Raft", zap.Error(err)) + readIndexFailed.Inc() + return err + } + return nil +} + func (s *EtcdServer) LinearizableReadNotify(ctx context.Context) error { return s.linearizableReadNotify(ctx) }