Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make cluster leaving more graceful #1503

Open
wants to merge 1 commit into
base: 4.x
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 85 additions & 8 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (

const (
raftTimeout = 30 * time.Second
// raftRemoveGracePeriod is how long we wait to allow a RemovePeer
// to replicate to gracefully leave the cluster.
raftRemoveGracePeriod = 5 * time.Second
// raftLogCacheSize is the maximum number of logs to cache in-memory.
// This is used to reduce disk I/O for the recently committed entries.
raftLogCacheSize = 512
Expand Down Expand Up @@ -264,24 +267,98 @@ func (a *Agent) Stop() error {
a.logger.Info("agent: Called member stop, now stopping")

if a.config.Server {
// shutdown scheduler
if a.sched.Started() {
<-a.sched.Stop().Done()
}
// if we are a server we should attempt to transfer leadership
isLeader := a.IsLeader()
if isLeader {
numVoters := 0
future := a.raft.GetConfiguration()
if err := future.Error(); err != nil {
a.logger.Warnf("failed to get raft configuration: %s", err.Error())
} else {
for _, server := range future.Configuration().Servers {
if server.Suffrage == raft.Voter {
numVoters++
}
}
}
// If there are more than one server, we can attempt to transfer leadership
if numVoters > 1 {
if err := a.raft.LeadershipTransfer(); err == nil {
isLeader = false
} else {
a.logger.Warnf("failed to transfer leadership: %s", err.Error())
future := a.raft.RemoveServer(raft.ServerID(a.config.NodeName), 0, 0)
if err := future.Error(); err != nil {
a.logger.Warnf("failed to remove ourself from raft: %s", err.Error())
}
}
}
}

// Leave serf cluster
if err := a.serf.Leave(); err != nil {
a.logger.Error("failed to leave serf cluster", "error", err)
return err
}

if err := a.serf.Shutdown(); err != nil {
a.logger.Error("failed to shutdown serf", "error", err)
return err
}

// If we aren't leader, wait for the removing from the cluster
if !isLeader {
left := false
// wait for our removal from the raft cluster
limit := time.Now().Add(raftRemoveGracePeriod)
addr := a.raftTransport.LocalAddr()
for !left && time.Now().Before(limit) {
// Sleep a while before we check.
time.Sleep(50 * time.Millisecond)

// Get the latest configuration.
future := a.raft.GetConfiguration()
if err := future.Error(); err != nil {
a.logger.Error("failed to get raft configuration", "error", err)
break
}

// See if we are no longer included.
left = true
for _, server := range future.Configuration().Servers {
if server.Address == addr {
left = false
break
}
}
}

if !left {
a.logger.Warn("failed to leave raft configuration gracefully, timeout")
}
}

// TODO: Check why Shutdown().Error() is not working
_ = a.raft.Shutdown()

if err := a.Store.Shutdown(); err != nil {
a.logger.Error("failed to shutdown store", "error", err)
return err
}
}

if err := a.serf.Leave(); err != nil {
return err
}

if err := a.serf.Shutdown(); err != nil {
return err
} else {
if err := a.serf.Leave(); err != nil {
a.logger.Error("failed to leave Serf cluster", "error", err)
return err
}
if err := a.serf.Shutdown(); err != nil {
a.logger.Error("failed to shutdown Serf", "error", err)
return err
}
return nil
}

return nil
Expand Down
Loading