Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net/raft: save snapshot position first #1312

Merged
merged 4 commits into from
Jun 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion generated/rev/RevId.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

public final class RevId {
public final String Id = "main/rev3243";
public final String Id = "main/rev3244";
}
2 changes: 1 addition & 1 deletion generated/rev/revid.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package rev

const ID string = "main/rev3243"
const ID string = "main/rev3244"
2 changes: 1 addition & 1 deletion generated/rev/revid.js
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@

export const rev_id = "main/rev3243"
export const rev_id = "main/rev3244"
2 changes: 1 addition & 1 deletion generated/rev/revid.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

module Chain::Rev
ID = "main/rev3243".freeze
ID = "main/rev3244".freeze
end
107 changes: 53 additions & 54 deletions net/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type Service struct {
// it is ok to read without keeping startMu locked in
// code paths where Service is known to be initialized.
startMu sync.Mutex
wal *wal.WAL
raftNode raft.Node
id uint64

Expand Down Expand Up @@ -215,14 +216,15 @@ func Start(laddr, dir string, httpClient *http.Client, state State) (*Service, e
sv.mux.HandleFunc("/raft/join", sv.serveJoin)
sv.mux.HandleFunc("/raft/msg", sv.serveMsg)

walobj, err := sv.recover()
var err error
sv.wal, err = sv.recover()
if err != nil {
return nil, err
}
// If there's no WAL, then this is a new node. The caller is responsible
// for calling either Init to initialize a new cluster or Join to join
// an existing cluster.
if walobj == nil {
if sv.wal == nil {
return sv, nil
}

Expand All @@ -231,25 +233,25 @@ func Start(laddr, dir string, httpClient *http.Client, state State) (*Service, e
return nil, errors.Wrap(err)
}

raftNode := raft.RestartNode(sv.config(id))
sv.id = id
raftNode := raft.RestartNode(sv.config())
err = raftNode.Campaign(ctx)
if err != nil {
log.Error(ctx, err, "election failed") // ok to continue
}

// Start the algorithm. It is okay to not lock startMu since
// sv hasn't escaped yet.
sv.id = id
sv.raftNode = raftNode
sv.startLocked(walobj)
sv.startLocked()

return sv, nil
}

// startLocked begins the raft algorithm. It requires sv.startMu
// to already be locked.
func (sv *Service) startLocked(walobj *wal.WAL) {
go sv.runUpdates(walobj)
func (sv *Service) startLocked() {
go sv.runUpdates()
go runTicks(sv.raftNode)
}

Expand All @@ -262,9 +264,9 @@ func (sv *Service) initialized() bool {
return sv.raftNode != nil
}

func (sv *Service) config(id uint64) *raft.Config {
func (sv *Service) config() *raft.Config {
return &raft.Config{
ID: id,
ID: sv.id,
ElectionTick: electionTick,
HeartbeatTick: heartbeatTick,
Storage: sv.raftStorage,
Expand Down Expand Up @@ -296,24 +298,24 @@ func (sv *Service) Init() error {
if err != nil {
return errors.Wrap(err)
}
walobj, err := wal.Create(sv.walDir(), nil)
sv.wal, err = wal.Create(sv.walDir(), nil)
if err != nil {
return errors.Wrap(err)
}

peers := []raft.Peer{{ID: firstNodeID, Context: []byte(sv.laddr)}}
raftNode := raft.StartNode(sv.config(firstNodeID), peers)

sv.id = firstNodeID
peers := []raft.Peer{{ID: sv.id, Context: []byte(sv.laddr)}}
raftNode := raft.StartNode(sv.config(), peers)

sv.raftNode = raftNode

// StartNode appends to the initial log a ConfChangeAddNode entry for
// each peer (in our case, just this node). We can't campaign until
// this entry is applied, so synchronously apply them before continuing.
rd := <-raftNode.Ready()
sv.runUpdatesReady(rd, walobj, map[string]chan bool{})
sv.runUpdatesReady(rd, sv.wal, map[string]chan bool{})

sv.startLocked(walobj)
sv.startLocked()

// campaign immediately to avoid waiting electionTick ticks in tests
err = raftNode.Campaign(ctx)
Expand All @@ -337,13 +339,12 @@ func (sv *Service) Join(bootURL string) error {
return ErrExistingCluster
}

id, walobj, err := sv.join(sv.laddr, bootURL) // sets state
err := sv.join(sv.laddr, bootURL) // sets state, id, wal
if err != nil {
return err
}
sv.id = id
sv.raftNode = raft.RestartNode(sv.config(id))
sv.startLocked(walobj)
sv.raftNode = raft.RestartNode(sv.config())
sv.startLocked()
return nil
}

Expand All @@ -363,16 +364,6 @@ func (sv *Service) runUpdatesReady(rd raft.Ready, wal *wal.WAL, writers map[stri
sv.redo(func() error {
return sv.saveSnapshot(&rd.Snapshot)
})
sv.redo(func() error {
// Note: wal.SaveSnapshot saves the snapshot *position*,
// not the actual full snapshot data.
// That happens in sv.saveSnapshot just above.
// (So don't worry, we're not saving it twice.)
return wal.SaveSnapshot(walpb.Snapshot{
Index: rd.Snapshot.Metadata.Index,
Term: rd.Snapshot.Metadata.Term,
})
})
err := wal.ReleaseLockTo(rd.Snapshot.Metadata.Index)
if err != nil {
panic(err)
Expand Down Expand Up @@ -420,14 +411,14 @@ func replyReadIndex(rdIndices map[string]chan uint64, readStates []raft.ReadStat

// runUpdates runs forever, reading and processing updates from raft
// onto local storage.
func (sv *Service) runUpdates(wal *wal.WAL) {
func (sv *Service) runUpdates() {
rdIndices := make(map[string]chan uint64)
writers := make(map[string]chan bool)
for {
select {
case rd := <-sv.raftNode.Ready():
replyReadIndex(rdIndices, rd.ReadStates)
sv.runUpdatesReady(rd, wal, writers)
sv.runUpdatesReady(rd, sv.wal, writers)
case req := <-sv.rctxReq:
if req.index == nil {
delete(rdIndices, string(req.rctx))
Expand Down Expand Up @@ -694,12 +685,12 @@ func (sv *Service) serveJoin(w http.ResponseWriter, req *http.Request) {
// It requests an existing member to propose a configuration change
// adding the local process as a new member, then retrieves its new ID
// and a snapshot of the cluster state and applies it to sv.
func (sv *Service) join(addr, baseURL string) (id uint64, walobj *wal.WAL, err error) {
func (sv *Service) join(addr, baseURL string) error {
reqURL := strings.TrimRight(baseURL, "/") + "/raft/join"
b, _ := json.Marshal(struct{ Addr string }{addr})
resp, err := sv.client.Post(reqURL, contentType, bytes.NewReader(b))
if err != nil {
return 0, nil, errors.Wrap(err)
return errors.Wrap(err)
}
defer resp.Body.Close()

Expand All @@ -714,63 +705,56 @@ func (sv *Service) join(addr, baseURL string) (id uint64, walobj *wal.WAL, err e
err = errors.WithDetail(ErrPeerUninitialized, detail)
}
}
return 0, nil, errors.Wrap(err, "joining cluster")
return errors.Wrap(err, "joining cluster")
}

var x nodeJoin
err = json.NewDecoder(resp.Body).Decode(&x)
if err != nil {
return 0, nil, errors.Wrap(err)
return errors.Wrap(err)
}
id = x.ID
sv.id = x.ID
var raftSnap raftpb.Snapshot
err = decodeSnapshot(x.Snap, &raftSnap)
if err != nil {
return 0, nil, errors.Wrap(err)
return errors.Wrap(err)
}

ctx := context.Background()
err = sv.raftStorage.ApplySnapshot(raftSnap)
if err != nil {
return 0, nil, errors.Wrap(err)
return errors.Wrap(err)
}

log.Printkv(ctx, "raftid", id)
err = writeID(sv.dir, id)
log.Printkv(ctx, "raftid", sv.id)
err = writeID(sv.dir, sv.id)
if err != nil {
return 0, nil, err
return errors.Wrap(err)
}

err = os.Remove(sv.walDir())
if err != nil {
return 0, nil, errors.Wrap(err)
return errors.Wrap(err)
}
walobj, err = wal.Create(sv.walDir(), nil)
sv.wal, err = wal.Create(sv.walDir(), nil)
if err != nil {
return 0, nil, errors.Wrap(err)
return errors.Wrap(err)
}

if !raft.IsEmptySnap(raftSnap) {
err := sv.saveSnapshot(&raftSnap)
if err != nil {
return 0, nil, errors.Wrap(err)
}
err = walobj.SaveSnapshot(walpb.Snapshot{
Index: raftSnap.Metadata.Index,
Term: raftSnap.Metadata.Term,
})
if err != nil {
return 0, nil, errors.Wrap(err)
return errors.Wrap(err)
}
err = sv.state.RestoreSnapshot(raftSnap.Data, raftSnap.Metadata.Index)
if err != nil {
return 0, nil, errors.Wrap(err)
return errors.Wrap(err)
}
sv.confState = raftSnap.Metadata.ConfState
sv.snapIndex = raftSnap.Metadata.Index
}
log.Printkv(ctx, "at", "joined", "appliedindex", raftSnap.Metadata.Index)
return id, walobj, nil
return nil
}

func encodeSnapshot(snapshot *raftpb.Snapshot) ([]byte, error) {
Expand Down Expand Up @@ -945,6 +929,7 @@ func (sv *Service) getSnapshot() *raftpb.Snapshot {

func (sv *Service) triggerSnapshot() error {
snap := sv.getSnapshot()

err := sv.saveSnapshot(snap)
if err != nil {
return errors.Wrap(err)
Expand All @@ -967,6 +952,20 @@ func (sv *Service) saveSnapshot(snapshot *raftpb.Snapshot) error {
if err != nil {
panic(err)
}

// First, write the index of the snapshot to the WAL. This
// ensures we never try to open the WAL at an index that was
// not saved to the WAL.
// https://github.com/coreos/etcd/issues/8082
err = sv.wal.SaveSnapshot(walpb.Snapshot{
Index: snapshot.Metadata.Index,
Term: snapshot.Metadata.Term,
})
if err != nil {
return err
}

// Then atomically replace the on-disk snapshot.
return writeFile(sv.snapFile(), d, 0666)
}

Expand Down