Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement: persist commit index in LogStore to accelerate recovery #613

Open
wants to merge 20 commits into
base: main
Choose a base branch
from

Conversation

lalalalatt
Copy link

@lalalalatt lalalalatt commented Sep 1, 2024

@lalalalatt lalalalatt requested review from a team as code owners September 1, 2024 08:25
@lalalalatt lalalalatt requested review from rboyer and removed request for a team September 1, 2024 08:25
Copy link

hashicorp-cla-app bot commented Sep 1, 2024

CLA assistant check
All committers have signed the CLA.

@lalalalatt
Copy link
Author

lalalalatt commented Sep 1, 2024

@banks This is first PR for #549.
Thanks for spending your time on reviewing this~

@lalalalatt
Copy link
Author

lalalalatt commented Sep 1, 2024

Proposal for next PR:
Persist commit index every time processLogs is called. (commit index changes only happened at there)

raft/raft.go

Lines 1292 to 1359 in 42d3446

func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) {
// Reject logs we've applied already
lastApplied := r.getLastApplied()
if index <= lastApplied {
r.logger.Warn("skipping application of old log", "index", index)
return
}
applyBatch := func(batch []*commitTuple) {
select {
case r.fsmMutateCh <- batch:
case <-r.shutdownCh:
for _, cl := range batch {
if cl.future != nil {
cl.future.respond(ErrRaftShutdown)
}
}
}
}
// Store maxAppendEntries for this call in case it ever becomes reloadable. We
// need to use the same value for all lines here to get the expected result.
maxAppendEntries := r.config().MaxAppendEntries
batch := make([]*commitTuple, 0, maxAppendEntries)
// Apply all the preceding logs
for idx := lastApplied + 1; idx <= index; idx++ {
var preparedLog *commitTuple
// Get the log, either from the future or from our log store
future, futureOk := futures[idx]
if futureOk {
preparedLog = r.prepareLog(&future.log, future)
} else {
l := new(Log)
if err := r.logs.GetLog(idx, l); err != nil {
r.logger.Error("failed to get log", "index", idx, "error", err)
panic(err)
}
preparedLog = r.prepareLog(l, nil)
}
switch {
case preparedLog != nil:
// If we have a log ready to send to the FSM add it to the batch.
// The FSM thread will respond to the future.
batch = append(batch, preparedLog)
// If we have filled up a batch, send it to the FSM
if len(batch) >= maxAppendEntries {
applyBatch(batch)
batch = make([]*commitTuple, 0, maxAppendEntries)
}
case futureOk:
// Invoke the future if given.
future.respond(nil)
}
}
// If there are any remaining logs in the batch apply them
if len(batch) != 0 {
applyBatch(batch)
}
// Update the lastApplied index and term
r.setLastApplied(index)
}

func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) { 
 	// Reject logs we've applied already 
 	lastApplied := r.getLastApplied() 
 	if index <= lastApplied { 
 		r.logger.Warn("skipping application of old log", "index", index) 
 		return 
 	} 

+       if r.fastRecovery && isCommitTrackingLogStore(r.logs) {
+               store := r.logs.(CommitTrackingLogStore)
+               if err = store.SetCommitIndex(index) {
+                       // show some error msg
+               }
+       }
  
 	....
  
 	// Update the lastApplied index and term 
 	r.setLastApplied(index) 
 }

@otoolep
Copy link
Contributor

otoolep commented Sep 1, 2024

As a long-term user of this library, this could be useful. However I would strongly recommend that this functionality be wrapped in a flag, which is settable in the Raft Config object (similar to NoSnapshotRestoreOnStart), and be disabled by default. Systems built on Raft need to be solid, so taking a conservative approach is warranted.

@lalalalatt
Copy link
Author

lalalalatt commented Sep 2, 2024

As a long-term user of this library, this could be useful. However I would strongly recommend that this functionality be wrapped in a flag, which is settable in the Raft Config object (similar to NoSnapshotRestoreOnStart), and be disabled by default. Systems built on Raft need to be solid, so taking a conservative approach is warranted.

@otoolep Thanks for the suggestion, I would add r.fastRecovery as the feature flag~

Copy link
Member

@banks banks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for starting on this @lalalalatt.

The interface looks good. I have a comment inline and given that I'm suggesting removing half the lines in this PR, it might make sense to also add the change you proposed for your "next PR" into this one?

I also think it might be worth rebasing this PR on to a feature branch like f/fast-recovery so we can keep PRs small to review but not merge code into main until it's a working feature that can be released. (I realise this would be a no-op, but still if we don't complete the feature it will be unused code that will need later cleanup or completion.)

What do you think of that approach?

log.go Outdated Show resolved Hide resolved
@lalalalatt
Copy link
Author

I also think it might be worth rebasing this PR on to a feature branch like f/fast-recovery so we can keep PRs small to review but not merge code into main until it's a working feature that can be released. (I realise this would be a no-op, but still if we don't complete the feature it will be unused code that will need later cleanup or completion.)

@banks Sure, that sounds like a good plan. Could you help me create the f/fast-recovery branch? Thanks!

@otoolep
Copy link
Contributor

otoolep commented Sep 3, 2024

Drive-by comment.

@banks Sure, that sounds like a good plan. Could you help me create the f/fast-recovery branch? Thanks!

That's not the right way to think about the development flow (not unless this repo is managed in some unusual way). You create the branch in your own fork of this repo, and then generate a PR from that branch in your fork to the main branch in this repo.

@banks
Copy link
Member

banks commented Sep 3, 2024

@otoolep in general that is usually how GH works. In this case I wonder if we should have a long running branch here so that we can keep PRs small and review the changes in small parts rather than wait until the entire feature is built and have to review it all in one huge PR from the fork to here.

One way to do that would be for me to create a long-lived feature branch here, another would be for us to review PRs in a fork but that leaves all the interim review in a separate place 🤔 .

On reflection. I'm not sure what is gained by trying to make this many small PRs yet. Let's continue work here and see how large this PR gets as more of the code is built before we worry too much about making feature branches. Sorry for the suggestion that wasn't very well thought through!

@lalalalatt
Copy link
Author

On reflection. I'm not sure what is gained by trying to make this many small PRs yet. Let's continue work here and see how large this PR gets as more of the code is built before we worry too much about making feature branches. Sorry for the suggestion that wasn't very well thought through!

Ok, looks good!

- Introduced a `fastRecovery` flag in the Raft structure and configuration to enable fast recovery mode.
- Updated `NewRaft` to initialize `fastRecovery` from the configuration.
- Added `persistCommitIndex` function to store the commit index when fast recovery is enabled.
- Modified `processLogs` to persist the commit index before updating `lastApplied`.
- Documented the `FastRecovery` option in the config.
- Implemented `recoverFromCommitedLogs` function to recover the Raft node from committed logs.
- If `fastRecovery` is enabled and the log store implements `CommitTrackingLogStore`, the commit index is read from the store, avoiding the need to replay logs.
- Logs between the last applied and commit index are fed into the FSM for faster recovery.
…ency

- Refactor `ReadCommitIndex` to `GetCommitIndex` across `LogStore` and `InmemCommitTrackingStore`.
- Introduce `InmemCommitTrackingStore` to track commit index in memory for testing purposes.
- Add locking mechanism to safely read/write commit index in `InmemCommitTrackingStore`.
@lalalalatt lalalalatt changed the title Add CommitTrackingLogStore and its checker in LogStore Enhancement: persist commit index in LogStore to accelerate recovery Sep 6, 2024
api.go Outdated Show resolved Hide resolved
Copy link
Member

@banks banks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @lalalalatt, thanks for working on this.

I spotted a couple of things that we should tweak here. I've added comments inline - let me know if anything is confusing. I didn't do a "full" final review yet but these should get it more inline with my original design. Feel free to explain if I'm misunderstanding anything though!

api.go Outdated Show resolved Hide resolved
raft.go Outdated Show resolved Hide resolved
@peterxcli
Copy link

peterxcli commented Sep 10, 2024

tl;dr, we're fine here 😅 (on the correctness issue, the other comments about this PR stand). We should be careful to preserve the behaviour in this PR - even if we only replay up to commitIndex on the FSM, we should still process any config changes in the rest of the log after that!

@banks Thanks for these lots of comments 😄

But, for the current change, it is possible to apply same configurations twice, although its wouldn't violate the correctness, it still be wasteful.
So I would reverse the order of r.recoverFromCommitedLogs() and "iterating the whole log to see if any logs are config changes", then, if the persisted commit are applied, then we only need to "iterating the whole log r.logs[max(lastapplied, snapshotIndex) + 1 : ] to see if any logs are config changes" and that prevent the duplication~
upd: reverse order

@peterxcli
Copy link

I don't think this is the right place for this call. I think we should call it in storeLogs just before the actual call to StoreLogs on the storage since that is the write we want our log stores to be able to add this state to disk in. Calling it here will not do anything until the next StoreLogs call at least in the design I proposed in the issue.

I don't think storing commit index every time before StoreLogs is a good idea.

For followers, when they are appending the log received from leader, the commit index may not increase, because leader may not receive majority agreement on the corresponding logs.

For leader, as I mention previously, it persist logs by calling log.StoreLogs() when receiving new log from applyCh which client requested, but that doesn't mean the logs are committed, unless majority accept them.

My idea is: the commit index only update at two place:

  1. receive higher leaderCommit when member is follower
  2. leader's commitment detect majority have accepted a higher log index, then it would send a message to c.commitCh, then the leader main thread would receive that new commit index and actually update it in raft in-memory state.

They both trigger r.processLogs eventually, so we can just update the commit index there.

Maybe that's my misunderstanding 😅, if there is anything wrong, please correct me~

@banks
Copy link
Member

banks commented Sep 10, 2024

Hi @peterxcli

The idea behind persisting commit index during StoreLogs is to make sure it doesn't cause worse performance. Writing to disk is the slowest part of raft in most implementations.

If we add a new write to disk anywhere then performance will decrease and that's not acceptable for our products at least and probably not to other users of this library.

So the whole design was intended to persist the current known commit index along with the logs every time we write new logs - yes it's a few extra bytes but it's the fsync calls that are slow so if we can write in the same write it's virtually "free" for all the LogStore implemetations we use currently and doesn't impact performance.

For followers, when they are appending the log received from leader, the commit index may not increase, because leader may not receive majority agreement on the corresponding logs.

This is true, but the AppendEntries RPC is the method that a follower learns about previous commits. In a given AppendEntries RPC, there will be one or more new logs (ignoring heartbeats for now), lets say those are logs {1000, 1001} and the leader will also include CommitIndex = 998 which lets this follower know that everything up to 998 is comitted. My proposal is that we call store.SetCommitIndex() with the commit index the leader just sent in each AppendEntries RPC before we call StoreLogs then the log store can just include that meta data in the same write and we always have persisted the absolute most up to date commit info each follower knows, all without any additional disk fsyncs.

If we only call this during processLogs on the follower as you propose then there are two possibilities depending on how the LogStore implements it:

  1. The LogStore buffers that index in memory until the next call to StoreLogs - that would work and have the same performance as my proposal, but I think it's slightly worse than my proposal for two reasons:
    1. If commitIndex sent by the leader is not actually persisted until the next RPC arrives which means that whenever the node restarts it can only restore a smaller number of logs even though it did "know" about and possibly apply more logs than it's persisted commitIndex before it restarted. This difference is likely small, but it's easy to avoid this.
    2. I find it much more surprising and hard to reason about if we actually don't persist the commitIndex we learned from the leader until the next RPC (where we'll learn about a new commit index most likely).
  2. Or, the LogStore could write the commitIndex to disk synchronously when SetCommitIndex is called. This would be a whole extra fsync (or two for BoltDB or WAL implementations) which would likely cause a significant slow down in the speed we can write to raft. I don't think we should consider this option especially when there's an easy way to avoid it (my proposal).

For leader, as I mention previously, it persist logs by calling log.StoreLogs() when receiving new log from applyCh which client requested, but that doesn't mean the logs are committed, unless majority accept them.

You're correct that all the logs we store when we call StoreLogs on the leader are not yet committed, in fact, with our current implementation we guarantee that none of them are committed (except in the trivial case of a single node cluster) because we don't replicate to followers until after this. But again the leader would not be calling SetCommitIndex with the new log index, it would just be calling it with whatever it's current in-memory commitIndex is which is guaranteed to be some way behind.

Again this is so that we can "re-use" the fsync on the LogStore and so avoid more disk IO or waiting on the critical path of writing data.

Most of the same arguments as above apply here: if we wait until commitIndex updates and processLogs is called on the leader we'll either have a more confusing model where the commitIndex persisted is a batch behind the last known state on the leader for any given set of logs, or we'll make it all slower by adding a new sync disk write just to persist this state during processLogs.

As an aside, it we were OK with adding more disk writes, I'd not have proposed this as an extension of LogStore at all - we already have a StableStore that we could just add a new KV pair to. But if we did that and updated it on every commit it would massively impact the throughput of a raft cluster since disk writes (and fsyncs) are the slowest part and we'd effectively double the number of them on both leader and follower for every commit.

Does that make sense?

@peterxcli
Copy link

@banks oh~~ It seems like I completely misunderstood your design.

Again this is so that we can "re-use" the fsync on the LogStore and so avoid more disk IO or waiting on the critical path of writing data.

Got it, thanks~ 😄

As an aside, it we were OK with adding more disk writes, I'd not have proposed this as an extension of LogStore at all - we already have a StableStore that we could just add a new KV pair to. But if we did that and updated it on every commit it would massively impact the throughput of a raft cluster since disk writes (and fsyncs) are the slowest part and we'd effectively double the number of them on both leader and follower for every commit.

That's true 😅

@banks Thanks for your clarification very much~

- Implemented commit tracking logs in the Raft cluster configuration.
- Added tests for restoring snapshots on startup with commit tracking logs.
- Introduced a fast recovery test to ensure logs are applied correctly after restart.
- Updated `MakeClusterOpts` to include `CommitTrackingLogs` option.
@peterxcli
Copy link

Hi @banks,

I’ve just added some tests for the fast recovery feature, and everything seems to be working well. I’d really appreciate any feedback or advice you might have.

Thank you!

Copy link
Member

@banks banks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Peter,

This is looking awesome, thanks for your hard work!

I have a couple of suggestions inline. I'm not super hard line on any of them if you think there's a good reason to reject so love to hear your thoughts but I'd lean towards them being an improvement that will help us maintain or reason about this in the future!

config.go Outdated Show resolved Hide resolved
inmem_store.go Show resolved Hide resolved
raft.go Outdated Show resolved Hide resolved
raft.go Outdated Show resolved Hide resolved
Updated the function name from `persistCommitIndex` to `tryPersistCommitIndex` to better reflect its behavior. This function now updates the commit index in the persistent store only if fast recovery is enabled and if the log store implements `CommitTrackingLogStore`. Adjusted references to this function in `dispatchLogs` and `appendEntries`.
…ence

- Rename SetCommitIndex to StagCommitIndex in CommitTrackingLogStore interface
- Add detailed documentation for StagCommitIndex method
- Update raft.go to use StagCommitIndex instead of SetCommitIndex
- Optimize commit index staging in appendEntries using min(lastNewIndex, leaderCommitIndex)

This change ensures commit index updates are only persisted atomically
with the following StoreLogs call, preventing inconsistencies between the
commit index and log entries in case of crashes. It also optimizes the
staged commit index to be as up-to-date as possible without exceeding the
last new entry index, reducing commit index lag in the CommitTrackingStore.
The contract for implementations is clarified, specifying that GetCommitIndex
must never return a value higher than the last index in the log.
@peterxcli
Copy link

peterxcli commented Sep 20, 2024

@banks Thanks for the advice of the commit index staging.
I have make some changes on it.
Would like to see your further opinion on it, thx~

Copy link
Member

@banks banks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking awesome Peter!

Thanks again. There is one suggested minor improvement in being more defensive which is maybe questionable but I think just about seems worth it to me. There's also a typo in the new interface name and the testing implementation didn't update with it so a couple of minor fixes and it's hopefully good to go!

I think once we have those fixed up, I'll try and get some other reviewers internally to look before we make a merge decision. We'll may also look to get the BoltDB and WAL implementations PRed so we can test the whole thing end-to-end before we merge any of these to be totally sure it actually works as expected.

api.go Outdated Show resolved Hide resolved
inmem_store.go Show resolved Hide resolved
log.go Outdated Show resolved Hide resolved
inmem_store.go Outdated Show resolved Hide resolved
raft.go Outdated Show resolved Hide resolved
}
assert.LessOrEqual(t, snap.Index, commitIdx)
assert.LessOrEqual(t, commitIdx, lastIdx)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming these tests are failing right now because the InmemStore wasn't updated to match the new interface right? If they are passing for you then it might be worth a look!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It passes😱

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @banks
First I want to thanks that you reviewed so many changes and gave lots of advices and opinions. 😁


Back to this topic, I think this test case is ok🤔.

Because basically the commit index stored in the store would always lower than the last log index in store (there is always one StoreLogs call lag).

As the comment "Expect: snap.Index --- commitIdx --- lastIdx" leaved, I think we can't sure what is the exact position of the commitIndex in every test, so I just test the interval only.
But now we find this would lead to another problem - we can't even detect the commit store interface and its implementation aren't match😱.
We can simply solve that detection issue by changing assert.LessOrEqual to assert.Less, but that would another flaky problem because of the uncertainty of the commit index.

What do you think?

- Rename SetCommitIndex to StageCommitIndex in InmemCommitTrackingStore
- Fix typo in CommitTrackingLogStore interface (StagCommitIndex to StageCommitIndex)
- Update error message in tryStageCommitIndex for consistency
- Ensure CommitTrackingLogStore extends LogStore interface
…idation

- Add error logging and panic for critical errors during recovery
- Replace lastApplied check with lastIndex comparison
- Ensure commitIndex does not exceed lastIndex
- Improve code structure and readability
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants