Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement: persist commit index in LogStore to accelerate recovery #613

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
2e5a8a0
feat: add CommitTrackingLogStore interface for commit index management
peterxcli Sep 1, 2024
ffc6b3b
chore: remove non-idiomatic type assert func
peterxcli Sep 3, 2024
7383d96
feat(raft): add fast recovery mode for quicker log application
peterxcli Sep 4, 2024
f6295e0
feat(raft): add recovery from committed logs during startup
peterxcli Sep 4, 2024
f2ae7a9
refactor(store): rename ReadCommitIndex to GetCommitIndex for consist…
peterxcli Sep 6, 2024
ce1895c
fix: also set inmem commit index when revocer log commit progress fro…
peterxcli Sep 10, 2024
ab50a58
perf: optimize startup recovery by skipping duplicated log replay
peterxcli Sep 10, 2024
4e7e04b
refactor(inmem-commit-tracking-store): store commit index in memory u…
peterxcli Sep 13, 2024
41df55e
chore: fix typo in recoverFromCommittedLogs function name
peterxcli Sep 13, 2024
400a27d
refactor(raft): update parameter name in persistCommitIndex function
peterxcli Sep 13, 2024
e2617e8
refactor(raft): set commit index in memory before `StoreLogs`
peterxcli Sep 13, 2024
6daca47
refactor(raft): fix condition for skipping recovery in `recoverFromCo…
peterxcli Sep 18, 2024
cc09317
feat(raft): add commit tracking logs and fast recovery tests
peterxcli Sep 18, 2024
fe57b32
docs(config): update comments for FastRecovery mechanism
peterxcli Sep 19, 2024
20e8701
refactor(inmem-commit-tracking-store): simplify in-mem log tracking s…
peterxcli Sep 19, 2024
6f146e1
fix: rename persistCommitIndex to tryPersistCommitIndex
peterxcli Sep 19, 2024
a8438b0
chore(raft): rename tryPersistCommitIndex to tryStageCommitIndex for …
peterxcli Sep 20, 2024
5e6d8a4
refactor(log): introduce StagCommitIndex for optimized atomic persist…
peterxcli Sep 20, 2024
e248f00
fix(raft): correct CommitTrackingLogStore implementation
peterxcli Sep 24, 2024
2a913ab
feat(raft): improve fast recovery error handling and commit index val…
peterxcli Sep 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ type Raft struct {
// preVoteDisabled control if the pre-vote feature is activated,
// prevote feature is disabled if set to true.
preVoteDisabled bool

// fastRecovery is used to enable fast recovery mode
// fast recovery mode is disabled if set to false.
fastRecovery bool
}

// BootstrapCluster initializes a server's storage with the given cluster
Expand Down Expand Up @@ -566,6 +570,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
followerNotifyCh: make(chan struct{}, 1),
mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second),
preVoteDisabled: conf.PreVoteDisabled || !transportSupportPreVote,
fastRecovery: conf.FastRecovery,
}
if !transportSupportPreVote && !conf.PreVoteDisabled {
r.logger.Warn("pre-vote is disabled because it is not supported by the Transport")
Expand Down Expand Up @@ -606,6 +611,8 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
// to be called concurrently with a blocking RPC.
trans.SetHeartbeatHandler(r.processHeartbeat)

r.recoverFromCommitedLogs()
lalalalatt marked this conversation as resolved.
Show resolved Hide resolved

if conf.skipStartup {
return r, nil
}
Expand Down Expand Up @@ -697,6 +704,29 @@ func (r *Raft) tryRestoreSingleSnapshot(snapshot *SnapshotMeta) bool {
return true
}

// recoverFromCommitedLogs recovers the Raft node from committed logs.
func (r *Raft) recoverFromCommitedLogs() error {
if !r.fastRecovery {
return nil
}
// If the store implements CommitTrackingLogStore, we can read the commit index from the store.
// This is useful when the store is able to track the commit index and we can avoid replaying logs.
store, ok := r.logs.(CommitTrackingLogStore)
if !ok {
return nil
}
commitIndex, err := store.GetCommitIndex()
if err != nil {
return fmt.Errorf("failed to read commit index from store: %w", err)
}
if commitIndex == 0 {
return nil
}

r.processLogs(commitIndex, nil)
lalalalatt marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

func (r *Raft) config() Config {
return r.conf.Load().(Config)
}
Expand Down
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ type Config struct {
// PreVoteDisabled deactivate the pre-vote feature when set to true
PreVoteDisabled bool

// FastRecovery controls if the Raft server should use the fast recovery
// mechanism. This mechanism allows a server to apply logs to the FSM till
// the last committed log
lalalalatt marked this conversation as resolved.
Show resolved Hide resolved
FastRecovery bool

// skipStartup allows NewRaft() to bypass all background work goroutines
skipStartup bool
}
Expand Down
28 changes: 28 additions & 0 deletions inmem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,31 @@ func (i *InmemStore) GetUint64(key []byte) (uint64, error) {
defer i.l.RUnlock()
return i.kvInt[string(key)], nil
}

type InmemCommitTrackingStore struct {
lalalalatt marked this conversation as resolved.
Show resolved Hide resolved
*InmemStore
commitIndexLock sync.RWMutex
commitIndex uint64
}

// NewInmemCommitTrackingStore returns a new in-memory backend that tracks the commit index. Do not ever
// use for production. Only for testing.
func NewInmemCommitTrackingStore() *InmemCommitTrackingStore {
i := &InmemCommitTrackingStore{
InmemStore: NewInmemStore(),
}
return i
}

func (i *InmemCommitTrackingStore) SetCommitIndex(index uint64) error {
lalalalatt marked this conversation as resolved.
Show resolved Hide resolved
i.commitIndexLock.Lock()
defer i.commitIndexLock.Unlock()
i.commitIndex = index
return nil
}

func (i *InmemCommitTrackingStore) GetCommitIndex() (uint64, error) {
i.commitIndexLock.RLock()
defer i.commitIndexLock.RUnlock()
return i.commitIndex, nil
}
lalalalatt marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 5 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,8 @@ func emitLogStoreMetrics(s LogStore, prefix []string, interval time.Duration, st
}
}
}

type CommitTrackingLogStore interface {
SetCommitIndex(idx uint64) error
GetCommitIndex() (uint64, error)
}
16 changes: 16 additions & 0 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,8 @@ func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) {
applyBatch(batch)
}

r.persistCommitIndex(index)

lalalalatt marked this conversation as resolved.
Show resolved Hide resolved
// Update the lastApplied index and term
r.setLastApplied(index)
}
Expand Down Expand Up @@ -1385,6 +1387,20 @@ func (r *Raft) prepareLog(l *Log, future *logFuture) *commitTuple {
return nil
}

// persistCommitIndex updates the commit index in persist store if fast recovery is enabled.
func (r *Raft) persistCommitIndex(index uint64) {
if !r.fastRecovery {
return
}
store, ok := r.logs.(CommitTrackingLogStore)
if !ok {
return
}
if err := store.SetCommitIndex(index); err != nil {
r.logger.Error("failed to set commit index in commit tracking log store", "index", index, "error", err)
}
}

// processRPC is called to handle an incoming RPC request. This must only be
// called from the main thread.
func (r *Raft) processRPC(rpc RPC) {
Expand Down