From 758ff0163ccd6efd08365d4ecd8070b4b3b7bc43 Mon Sep 17 00:00:00 2001 From: wpedrak Date: Thu, 11 Mar 2021 15:39:41 +0100 Subject: [PATCH] raft: postpone MsgReadIndex until first commit in the term Fixes #12680 --- raft/node.go | 2 ++ raft/raft.go | 62 +++++++++++++++++++++++++++++++++++------------ raft/raft_test.go | 16 ++++++++++-- 3 files changed, 62 insertions(+), 18 deletions(-) diff --git a/raft/node.go b/raft/node.go index 2a8f12ba416..dca5954f7ae 100644 --- a/raft/node.go +++ b/raft/node.go @@ -183,6 +183,8 @@ type Node interface { // Read state has a read index. Once the application advances further than the read // index, any linearizable read requests issued before the read request can be // processed safely. The read state will have the same rctx attached. + // Note that request can be lost without notice, therefore it is user's job + // to ensure read index retries. ReadIndex(ctx context.Context, rctx []byte) error // Status returns the current status of the raft state machine. diff --git a/raft/raft.go b/raft/raft.go index 73c3ca499d0..c80262ebaf0 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -307,6 +307,12 @@ type raft struct { step stepFunc logger Logger + + // pendingReadIndexMessages is used to store messages of type MsgReadIndex + // that can't be answered as new leader didn't committed any log in + // current term. Those will be handled as fast as first log is committed in + // current term. + pendingReadIndexMessages []pb.Message } func newRaft(c *Config) *raft { @@ -1072,26 +1078,15 @@ func stepLeader(r *raft, m pb.Message) error { return nil } - // Reject read only request when this leader has not committed any log entry at its term. + // Postpone read only request when this leader has not committed + // any log entry at its term. if !r.committedEntryInCurrentTerm() { + r.pendingReadIndexMessages = append(r.pendingReadIndexMessages, m) return nil } - // thinking: use an interally defined context instead of the user given context. - // We can express this in terms of the term and index instead of a user-supplied value. - // This would allow multiple reads to piggyback on the same message. - switch r.readOnly.option { - // If more than the local vote is needed, go through a full broadcast. - case ReadOnlySafe: - r.readOnly.addRequest(r.raftLog.committed, m) - // The local node automatically acks the request. - r.readOnly.recvAck(r.id, m.Entries[0].Data) - r.bcastHeartbeatWithCtx(m.Entries[0].Data) - case ReadOnlyLeaseBased: - if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None { - r.send(resp) - } - } + sendMsgReadIndexResponse(r, m) + return nil } @@ -1256,6 +1251,9 @@ func stepLeader(r *raft, m pb.Message) error { } if r.maybeCommit() { + // committed index has progressed for the term, so it is safe + // to respond to pending read index requests + releasePendingReadIndexMessages(r) r.bcastAppend() } else if oldPaused { // If we were paused before, this node may be missing the @@ -1805,3 +1803,35 @@ func numOfPendingConf(ents []pb.Entry) int { } return n } + +func releasePendingReadIndexMessages(r *raft) { + if !r.committedEntryInCurrentTerm() { + r.logger.Error("pending MsgReadIndex should be released only after first commit in current term") + return + } + + msgs := r.pendingReadIndexMessages + r.pendingReadIndexMessages = nil + + for _, m := range msgs { + sendMsgReadIndexResponse(r, m) + } +} + +func sendMsgReadIndexResponse(r *raft, m pb.Message) { + // thinking: use an internally defined context instead of the user given context. + // We can express this in terms of the term and index instead of a user-supplied value. + // This would allow multiple reads to piggyback on the same message. + switch r.readOnly.option { + // If more than the local vote is needed, go through a full broadcast. + case ReadOnlySafe: + r.readOnly.addRequest(r.raftLog.committed, m) + // The local node automatically acks the request. + r.readOnly.recvAck(r.id, m.Entries[0].Data) + r.bcastHeartbeatWithCtx(m.Entries[0].Data) + case ReadOnlyLeaseBased: + if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None { + r.send(resp) + } + } +} diff --git a/raft/raft_test.go b/raft/raft_test.go index 32227df87f4..f21670776b7 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -2397,8 +2397,7 @@ func TestReadOnlyForNewLeader(t *testing.T) { t.Fatalf("last log term = %d, want %d", lastLogTerm, sm.Term) } - // Ensure peer a accepts read only request after it commits a entry at its term. - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}}) + // Ensure peer a processed postponed read only request after it committed an entry at its term. if len(sm.readStates) != 1 { t.Fatalf("len(readStates) = %d, want 1", len(sm.readStates)) } @@ -2409,6 +2408,19 @@ func TestReadOnlyForNewLeader(t *testing.T) { if !bytes.Equal(rs.RequestCtx, wctx) { t.Fatalf("requestCtx = %v, want %v", rs.RequestCtx, wctx) } + + // Ensure peer a accepts read only request after it committed an entry at its term. + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}}) + if len(sm.readStates) != 2 { + t.Fatalf("len(readStates) = %d, want 2", len(sm.readStates)) + } + rs = sm.readStates[1] + if rs.Index != windex { + t.Fatalf("readIndex = %d, want %d", rs.Index, windex) + } + if !bytes.Equal(rs.RequestCtx, wctx) { + t.Fatalf("requestCtx = %v, want %v", rs.RequestCtx, wctx) + } } func TestLeaderAppResp(t *testing.T) {