Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: postpone MsgReadIndex until first commit in the term #12762

Merged
merged 1 commit into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ type Node interface {
// Read state has a read index. Once the application advances further than the read
// index, any linearizable read requests issued before the read request can be
// processed safely. The read state will have the same rctx attached.
// Note that request can be lost without notice, therefore it is user's job
// to ensure read index retries.
ReadIndex(ctx context.Context, rctx []byte) error

// Status returns the current status of the raft state machine.
Expand Down
62 changes: 46 additions & 16 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@ type raft struct {
step stepFunc

logger Logger

// pendingReadIndexMessages is used to store messages of type MsgReadIndex
// that can't be answered as new leader didn't committed any log in
// current term. Those will be handled as fast as first log is committed in
// current term.
pendingReadIndexMessages []pb.Message
}

func newRaft(c *Config) *raft {
Expand Down Expand Up @@ -1072,26 +1078,15 @@ func stepLeader(r *raft, m pb.Message) error {
return nil
}

// Reject read only request when this leader has not committed any log entry at its term.
// Postpone read only request when this leader has not committed
// any log entry at its term.
if !r.committedEntryInCurrentTerm() {
r.pendingReadIndexMessages = append(r.pendingReadIndexMessages, m)
return nil
}

// thinking: use an interally defined context instead of the user given context.
// We can express this in terms of the term and index instead of a user-supplied value.
// This would allow multiple reads to piggyback on the same message.
switch r.readOnly.option {
// If more than the local vote is needed, go through a full broadcast.
case ReadOnlySafe:
r.readOnly.addRequest(r.raftLog.committed, m)
// The local node automatically acks the request.
r.readOnly.recvAck(r.id, m.Entries[0].Data)
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
case ReadOnlyLeaseBased:
if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
r.send(resp)
}
}
sendMsgReadIndexResponse(r, m)

return nil
}

Expand Down Expand Up @@ -1256,6 +1251,9 @@ func stepLeader(r *raft, m pb.Message) error {
}

if r.maybeCommit() {
// committed index has progressed for the term, so it is safe
// to respond to pending read index requests
releasePendingReadIndexMessages(r)
r.bcastAppend()
} else if oldPaused {
// If we were paused before, this node may be missing the
Expand Down Expand Up @@ -1805,3 +1803,35 @@ func numOfPendingConf(ents []pb.Entry) int {
}
return n
}

func releasePendingReadIndexMessages(r *raft) {
if !r.committedEntryInCurrentTerm() {
r.logger.Error("pending MsgReadIndex should be released only after first commit in current term")
return
}

msgs := r.pendingReadIndexMessages
r.pendingReadIndexMessages = nil
ptabor marked this conversation as resolved.
Show resolved Hide resolved

for _, m := range msgs {
sendMsgReadIndexResponse(r, m)
}
}

func sendMsgReadIndexResponse(r *raft, m pb.Message) {
// thinking: use an internally defined context instead of the user given context.
// We can express this in terms of the term and index instead of a user-supplied value.
// This would allow multiple reads to piggyback on the same message.
switch r.readOnly.option {
// If more than the local vote is needed, go through a full broadcast.
case ReadOnlySafe:
r.readOnly.addRequest(r.raftLog.committed, m)
// The local node automatically acks the request.
r.readOnly.recvAck(r.id, m.Entries[0].Data)
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
case ReadOnlyLeaseBased:
if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
r.send(resp)
}
}
}
16 changes: 14 additions & 2 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2397,8 +2397,7 @@ func TestReadOnlyForNewLeader(t *testing.T) {
t.Fatalf("last log term = %d, want %d", lastLogTerm, sm.Term)
}

// Ensure peer a accepts read only request after it commits a entry at its term.
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}})
// Ensure peer a processed postponed read only request after it committed an entry at its term.
if len(sm.readStates) != 1 {
t.Fatalf("len(readStates) = %d, want 1", len(sm.readStates))
}
Expand All @@ -2409,6 +2408,19 @@ func TestReadOnlyForNewLeader(t *testing.T) {
if !bytes.Equal(rs.RequestCtx, wctx) {
t.Fatalf("requestCtx = %v, want %v", rs.RequestCtx, wctx)
}

// Ensure peer a accepts read only request after it committed an entry at its term.
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}})
if len(sm.readStates) != 2 {
t.Fatalf("len(readStates) = %d, want 2", len(sm.readStates))
}
rs = sm.readStates[1]
if rs.Index != windex {
t.Fatalf("readIndex = %d, want %d", rs.Index, windex)
}
if !bytes.Equal(rs.RequestCtx, wctx) {
t.Fatalf("requestCtx = %v, want %v", rs.RequestCtx, wctx)
}
}

func TestLeaderAppResp(t *testing.T) {
Expand Down