diff --git a/dkron/agent.go b/dkron/agent.go index a261f397c..5f163cd77 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -113,9 +113,10 @@ type Agent struct { // peers is used to track the known Dkron servers. This is // used for region forwarding and clustering. - peers map[string][]*ServerParts - localPeers map[raft.ServerAddress]*ServerParts - peerLock sync.RWMutex + peers map[string][]*ServerParts + localPeers map[raft.ServerAddress]*ServerParts + peerLock sync.RWMutex + serverLookup *ServerLookup activeExecutions sync.Map @@ -142,8 +143,9 @@ type AgentOption func(agent *Agent) // and running a Dkron instance. func NewAgent(config *Config, options ...AgentOption) *Agent { agent := &Agent{ - config: config, - retryJoinCh: make(chan error), + config: config, + retryJoinCh: make(chan error), + serverLookup: NewServerLookup(), } for _, option := range options { @@ -316,7 +318,13 @@ func (a *Agent) setupRaft() error { logger = a.logger.Logger.Writer() } - transport := raft.NewNetworkTransport(a.raftLayer, 3, raftTimeout, logger) + transConfig := &raft.NetworkTransportConfig{ + Stream: a.raftLayer, + MaxPool: 3, + Timeout: raftTimeout, + ServerAddressProvider: a.serverLookup, + } + transport := raft.NewNetworkTransportWithConfig(transConfig) a.raftTransport = transport config := raft.DefaultConfig() @@ -710,7 +718,10 @@ func (a *Agent) eventLoop() { a.localMemberEvent(me) case serf.EventMemberReap: a.localMemberEvent(me) - case serf.EventMemberUpdate, serf.EventUser, serf.EventQuery: // Ignore + case serf.EventMemberUpdate: + a.lanNodeUpdate(me) + a.localMemberEvent(me) + case serf.EventUser, serf.EventQuery: // Ignore default: a.logger.WithField("event", e.String()).Warn("agent: Unhandled serf event") } diff --git a/dkron/agent_test.go b/dkron/agent_test.go index 37ddc4ba1..78d3d62d2 100644 --- a/dkron/agent_test.go +++ b/dkron/agent_test.go @@ -16,7 +16,7 @@ import ( ) var ( - logLevel = "error" + logLevel = "info" ) func TestAgentCommand_runForElection(t *testing.T) { diff --git a/dkron/leader.go b/dkron/leader.go index 1605cbe6e..1d3119517 100644 --- a/dkron/leader.go +++ b/dkron/leader.go @@ -3,6 +3,7 @@ package dkron import ( "fmt" "net" + "strings" "sync" "time" @@ -284,17 +285,12 @@ func (a *Agent) addRaftPeer(m serf.Member, parts *ServerParts) error { if server.Address == raft.ServerAddress(addr) && server.ID == raft.ServerID(parts.ID) { return nil } - future := a.raft.RemoveServer(server.ID, 0, 0) if server.Address == raft.ServerAddress(addr) { + future := a.raft.RemoveServer(server.ID, 0, 0) if err := future.Error(); err != nil { return fmt.Errorf("error removing server with duplicate address %q: %s", server.Address, err) } a.logger.WithField("server", server.Address).Info("dkron: removed server with duplicate address") - } else { - if err := future.Error(); err != nil { - return fmt.Errorf("error removing server with duplicate ID %q: %s", server.ID, err) - } - a.logger.WithField("server", server.ID).Info("dkron: removed server with duplicate ID") } } } @@ -315,6 +311,15 @@ func (a *Agent) addRaftPeer(m serf.Member, parts *ServerParts) error { // removeRaftPeer is used to remove a Raft peer when a dkron server leaves // or is reaped func (a *Agent) removeRaftPeer(m serf.Member, parts *ServerParts) error { + + // Do not remove ourself. This can only happen if the current leader + // is leaving. Instead, we should allow a follower to take-over and + // deregister us later. + if strings.EqualFold(m.Name, a.config.NodeName) { + a.logger.Warn("removing self should be done by follower", "name", a.config.NodeName) + return nil + } + // See if it's already in the configuration. It's harmless to re-remove it // but we want to avoid doing that if possible to prevent useless Raft // log entries. diff --git a/dkron/serf.go b/dkron/serf.go index 6311aee02..5398ddbd8 100644 --- a/dkron/serf.go +++ b/dkron/serf.go @@ -2,6 +2,7 @@ package dkron import ( "strings" + "time" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" @@ -11,6 +12,9 @@ const ( // StatusReap is used to update the status of a node if we // are handling a EventMemberReap StatusReap = serf.MemberStatus(-1) + + // maxPeerRetries limits how many invalidate attempts are made + maxPeerRetries = 6 ) // nodeJoin is used to handle join events on the serf cluster @@ -21,8 +25,8 @@ func (a *Agent) nodeJoin(me serf.MemberEvent) { a.logger.WithField("member", m.Name).Warn("non-server in gossip pool") continue } - a.logger.WithField("server", parts.Name).Info("adding server") - + a.logger.WithField("server", parts.Name).Info("Adding LAN adding server") + a.serverLookup.AddServer(parts) // Check if this server is known found := false a.peerLock.Lock() @@ -110,7 +114,46 @@ func (a *Agent) maybeBootstrap() { return } - // TODO: Query each of the servers and make sure they report no Raft peers. + // Query each of the servers and make sure they report no Raft peers. + for _, server := range servers { + var peers []string + + // Retry with exponential backoff to get peer status from this server + for attempt := uint(0); attempt < maxPeerRetries; attempt++ { + configuration, err := a.GRPCClient.RaftGetConfiguration(server.RPCAddr.String()) + if err != nil { + nextRetry := (1 << attempt) * time.Second + a.logger.Error("Failed to confirm peer status for server (will retry).", + "server", server.Name, + "retry_interval", nextRetry.String(), + "error", err, + ) + time.Sleep(nextRetry) + } else { + for _, peer := range configuration.Servers { + peers = append(peers, peer.Id) + } + break + } + } + + // Found a node with some Raft peers, stop bootstrap since there's + // evidence of an existing cluster. We should get folded in by the + // existing servers if that's the case, so it's cleaner to sit as a + // candidate with no peers so we don't cause spurious elections. + // It's OK this is racy, because even with an initial bootstrap + // as long as one peer runs bootstrap things will work, and if we + // have multiple peers bootstrap in the same way, that's OK. We + // just don't want a server added much later to do a live bootstrap + // and interfere with the cluster. This isn't required for Raft's + // correctness because no server in the existing cluster will vote + // for this server, but it makes things much more stable. + if len(peers) > 0 { + a.logger.Info("Existing Raft peers reported by server, disabling bootstrap mode", "server", server.Name) + a.config.BootstrapExpect = 0 + return + } + } // Update the peer set // Attempt a live bootstrap! @@ -174,6 +217,7 @@ func (a *Agent) nodeFailed(me serf.MemberEvent) { delete(a.localPeers, raft.ServerAddress(parts.Addr.String())) } a.peerLock.Unlock() + a.serverLookup.RemoveServer(parts) } } @@ -200,3 +244,16 @@ func (a *Agent) localMemberEvent(me serf.MemberEvent) { } } } + +func (a *Agent) lanNodeUpdate(me serf.MemberEvent) { + for _, m := range me.Members { + ok, parts := isServer(m) + if !ok { + continue + } + a.logger.WithField("server", parts.String()).Info("Updating LAN server") + + // Update server lookup + a.serverLookup.AddServer(parts) + } +} diff --git a/dkron/server_lookup.go b/dkron/server_lookup.go new file mode 100644 index 000000000..2343cc433 --- /dev/null +++ b/dkron/server_lookup.go @@ -0,0 +1,76 @@ +package dkron + +import ( + "fmt" + "sync" + + "github.com/hashicorp/raft" +) + +// ServerLookup encapsulates looking up servers by id and address +type ServerLookup struct { + lock sync.RWMutex + addressToServer map[raft.ServerAddress]*ServerParts + idToServer map[raft.ServerID]*ServerParts +} + +func NewServerLookup() *ServerLookup { + return &ServerLookup{ + lock: sync.RWMutex{}, + addressToServer: make(map[raft.ServerAddress]*ServerParts), + idToServer: make(map[raft.ServerID]*ServerParts), + } +} + +func (sl *ServerLookup) AddServer(server *ServerParts) { + sl.lock.Lock() + defer sl.lock.Unlock() + sl.addressToServer[raft.ServerAddress(server.RPCAddr.String())] = server + sl.idToServer[raft.ServerID(server.ID)] = server +} + +func (sl *ServerLookup) RemoveServer(server *ServerParts) { + sl.lock.Lock() + defer sl.lock.Unlock() + delete(sl.addressToServer, raft.ServerAddress(server.RPCAddr.String())) + delete(sl.idToServer, raft.ServerID(server.ID)) +} + +// Implements the ServerAddressProvider interface +func (sl *ServerLookup) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) { + sl.lock.RLock() + defer sl.lock.RUnlock() + svr, ok := sl.idToServer[id] + if !ok { + return "", fmt.Errorf("Could not find address for server id %v", id) + } + return raft.ServerAddress(svr.RPCAddr.String()), nil +} + +// Server looks up the server by address, returns a boolean if not found +func (sl *ServerLookup) Server(addr raft.ServerAddress) *ServerParts { + sl.lock.RLock() + defer sl.lock.RUnlock() + return sl.addressToServer[addr] +} + +func (sl *ServerLookup) Servers() []*ServerParts { + sl.lock.RLock() + defer sl.lock.RUnlock() + var ret []*ServerParts + for _, svr := range sl.addressToServer { + ret = append(ret, svr) + } + return ret +} + +func (sl *ServerLookup) CheckServers(fn func(srv *ServerParts) bool) { + sl.lock.RLock() + defer sl.lock.RUnlock() + + for _, srv := range sl.addressToServer { + if !fn(srv) { + return + } + } +} diff --git a/dkron/server_lookup_test.go b/dkron/server_lookup_test.go new file mode 100644 index 000000000..92a59aa6d --- /dev/null +++ b/dkron/server_lookup_test.go @@ -0,0 +1,91 @@ +package dkron + +import ( + "testing" + + "github.com/hashicorp/raft" + "github.com/stretchr/testify/require" +) + +type testAddr struct { + addr string +} + +func (ta *testAddr) Network() string { + return "tcp" +} + +func (ta *testAddr) String() string { + return ta.addr +} + +func TestAddServer(t *testing.T) { + // arrange + lookup := NewServerLookup() + id1, addr1 := "server-1", "127.0.0.1:8300" + id2, addr2 := "server-2", "127.0.0.2:8300" + server1, server2 := buildServerParts(id1, addr1), buildServerParts(id2, addr2) + + // act + lookup.AddServer(server1) + lookup.AddServer(server2) + + // assert + servers := lookup.Servers() + expectedServers := []*ServerParts{server1, server2} + require.EqualValuesf(t, expectedServers, servers, "Expected %v but got %v", expectedServers, servers) + + got, err := lookup.ServerAddr(raft.ServerID(id1)) + require.NoErrorf(t, err, "Unexpected error: %v", err) + require.EqualValuesf(t, addr1, string(got), "Expected %v but got %v", addr1, got) + + server := lookup.Server(raft.ServerAddress(addr1)) + strAddr := server.RPCAddr.String() + require.EqualValuesf(t, addr1, strAddr, "Expected lookup to return address %v but got %v", addr1, strAddr) + + got, err = lookup.ServerAddr(raft.ServerID(id2)) + require.NoErrorf(t, err, "Unexpected error: %v", err) + require.EqualValuesf(t, addr2, string(got), "Expected %v but got %v", addr2, got) + + server = lookup.Server(raft.ServerAddress(addr2)) + strAddr = server.RPCAddr.String() + require.EqualValuesf(t, addr2, strAddr, "Expected lookup to return address %v but got %v", addr2, strAddr) +} + +func TestRemoveServer(t *testing.T) { + // arrange + lookup := NewServerLookup() + id1, addr1 := "server-1", "127.0.0.1:8300" + id2, addr2 := "server-2", "127.0.0.2:8300" + server1, server2 := buildServerParts(id1, addr1), buildServerParts(id2, addr2) + lookup.AddServer(server1) + lookup.AddServer(server2) + + // act + lookup.RemoveServer(server1) + + // assert + servers := lookup.Servers() + expectedServers := []*ServerParts{server2} + require.EqualValuesf(t, expectedServers, servers, "Expected %v but got %v", expectedServers, servers) + + require.Nilf(t, lookup.Server(raft.ServerAddress(addr1)), "Expected lookup to return nil") + addr, err := lookup.ServerAddr(raft.ServerID(id1)) + require.Errorf(t, err, "Expected lookup to return error") + require.EqualValuesf(t, "", string(addr), "Expected empty address but got %v", addr) + + got, err := lookup.ServerAddr(raft.ServerID(id2)) + require.NoErrorf(t, err, "Unexpected error: %v", err) + require.EqualValuesf(t, addr2, string(got), "Expected %v but got %v", addr2, got) + + server := lookup.Server(raft.ServerAddress(addr2)) + strAddr := server.RPCAddr.String() + require.EqualValuesf(t, addr2, strAddr, "Expected lookup to return address %v but got %v", addr2, strAddr) +} + +func buildServerParts(id, addr string) *ServerParts { + return &ServerParts{ + ID: id, + RPCAddr: &testAddr{addr}, + } +}