Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle ip changes #1446

Merged
merged 12 commits into from
Feb 3, 2024
25 changes: 18 additions & 7 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ type Agent struct {

// peers is used to track the known Dkron servers. This is
// used for region forwarding and clustering.
peers map[string][]*ServerParts
localPeers map[raft.ServerAddress]*ServerParts
peerLock sync.RWMutex
peers map[string][]*ServerParts
localPeers map[raft.ServerAddress]*ServerParts
peerLock sync.RWMutex
serverLookup *ServerLookup

activeExecutions sync.Map

Expand All @@ -142,8 +143,9 @@ type AgentOption func(agent *Agent)
// and running a Dkron instance.
func NewAgent(config *Config, options ...AgentOption) *Agent {
agent := &Agent{
config: config,
retryJoinCh: make(chan error),
config: config,
retryJoinCh: make(chan error),
serverLookup: NewServerLookup(),
}

for _, option := range options {
Expand Down Expand Up @@ -316,7 +318,13 @@ func (a *Agent) setupRaft() error {
logger = a.logger.Logger.Writer()
}

transport := raft.NewNetworkTransport(a.raftLayer, 3, raftTimeout, logger)
transConfig := &raft.NetworkTransportConfig{
Stream: a.raftLayer,
MaxPool: 3,
Timeout: raftTimeout,
ServerAddressProvider: a.serverLookup,
}
transport := raft.NewNetworkTransportWithConfig(transConfig)
a.raftTransport = transport

config := raft.DefaultConfig()
Expand Down Expand Up @@ -710,7 +718,10 @@ func (a *Agent) eventLoop() {
a.localMemberEvent(me)
case serf.EventMemberReap:
a.localMemberEvent(me)
case serf.EventMemberUpdate, serf.EventUser, serf.EventQuery: // Ignore
case serf.EventMemberUpdate:
a.lanNodeUpdate(me)
a.localMemberEvent(me)
case serf.EventUser, serf.EventQuery: // Ignore
default:
a.logger.WithField("event", e.String()).Warn("agent: Unhandled serf event")
}
Expand Down
2 changes: 1 addition & 1 deletion dkron/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

var (
logLevel = "error"
logLevel = "info"
)

func TestAgentCommand_runForElection(t *testing.T) {
Expand Down
17 changes: 11 additions & 6 deletions dkron/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dkron
import (
"fmt"
"net"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -284,17 +285,12 @@ func (a *Agent) addRaftPeer(m serf.Member, parts *ServerParts) error {
if server.Address == raft.ServerAddress(addr) && server.ID == raft.ServerID(parts.ID) {
return nil
}
future := a.raft.RemoveServer(server.ID, 0, 0)
if server.Address == raft.ServerAddress(addr) {
future := a.raft.RemoveServer(server.ID, 0, 0)
if err := future.Error(); err != nil {
return fmt.Errorf("error removing server with duplicate address %q: %s", server.Address, err)
}
a.logger.WithField("server", server.Address).Info("dkron: removed server with duplicate address")
} else {
if err := future.Error(); err != nil {
return fmt.Errorf("error removing server with duplicate ID %q: %s", server.ID, err)
}
a.logger.WithField("server", server.ID).Info("dkron: removed server with duplicate ID")
}
}
}
Expand All @@ -315,6 +311,15 @@ func (a *Agent) addRaftPeer(m serf.Member, parts *ServerParts) error {
// removeRaftPeer is used to remove a Raft peer when a dkron server leaves
// or is reaped
func (a *Agent) removeRaftPeer(m serf.Member, parts *ServerParts) error {

// Do not remove ourself. This can only happen if the current leader
// is leaving. Instead, we should allow a follower to take-over and
// deregister us later.
if strings.EqualFold(m.Name, a.config.NodeName) {
a.logger.Warn("removing self should be done by follower", "name", a.config.NodeName)
return nil
}

// See if it's already in the configuration. It's harmless to re-remove it
// but we want to avoid doing that if possible to prevent useless Raft
// log entries.
Expand Down
63 changes: 60 additions & 3 deletions dkron/serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dkron

import (
"strings"
"time"

"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
Expand All @@ -11,6 +12,9 @@ const (
// StatusReap is used to update the status of a node if we
// are handling a EventMemberReap
StatusReap = serf.MemberStatus(-1)

// maxPeerRetries limits how many invalidate attempts are made
maxPeerRetries = 6
)

// nodeJoin is used to handle join events on the serf cluster
Expand All @@ -21,8 +25,8 @@ func (a *Agent) nodeJoin(me serf.MemberEvent) {
a.logger.WithField("member", m.Name).Warn("non-server in gossip pool")
continue
}
a.logger.WithField("server", parts.Name).Info("adding server")

a.logger.WithField("server", parts.Name).Info("Adding LAN adding server")
a.serverLookup.AddServer(parts)
// Check if this server is known
found := false
a.peerLock.Lock()
Expand Down Expand Up @@ -110,7 +114,46 @@ func (a *Agent) maybeBootstrap() {
return
}

// TODO: Query each of the servers and make sure they report no Raft peers.
// Query each of the servers and make sure they report no Raft peers.
for _, server := range servers {
var peers []string

// Retry with exponential backoff to get peer status from this server
for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
configuration, err := a.GRPCClient.RaftGetConfiguration(server.RPCAddr.String())
if err != nil {
nextRetry := (1 << attempt) * time.Second
a.logger.Error("Failed to confirm peer status for server (will retry).",
"server", server.Name,
"retry_interval", nextRetry.String(),
"error", err,
)
time.Sleep(nextRetry)
} else {
for _, peer := range configuration.Servers {
peers = append(peers, peer.Id)
}
break
}
}

// Found a node with some Raft peers, stop bootstrap since there's
// evidence of an existing cluster. We should get folded in by the
// existing servers if that's the case, so it's cleaner to sit as a
// candidate with no peers so we don't cause spurious elections.
// It's OK this is racy, because even with an initial bootstrap
// as long as one peer runs bootstrap things will work, and if we
// have multiple peers bootstrap in the same way, that's OK. We
// just don't want a server added much later to do a live bootstrap
// and interfere with the cluster. This isn't required for Raft's
// correctness because no server in the existing cluster will vote
// for this server, but it makes things much more stable.
if len(peers) > 0 {
a.logger.Info("Existing Raft peers reported by server, disabling bootstrap mode", "server", server.Name)
a.config.BootstrapExpect = 0
return
}
}

// Update the peer set
// Attempt a live bootstrap!
Expand Down Expand Up @@ -174,6 +217,7 @@ func (a *Agent) nodeFailed(me serf.MemberEvent) {
delete(a.localPeers, raft.ServerAddress(parts.Addr.String()))
}
a.peerLock.Unlock()
a.serverLookup.RemoveServer(parts)
}
}

Expand All @@ -200,3 +244,16 @@ func (a *Agent) localMemberEvent(me serf.MemberEvent) {
}
}
}

func (a *Agent) lanNodeUpdate(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := isServer(m)
if !ok {
continue
}
a.logger.WithField("server", parts.String()).Info("Updating LAN server")

// Update server lookup
a.serverLookup.AddServer(parts)
}
}
76 changes: 76 additions & 0 deletions dkron/server_lookup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package dkron

import (
"fmt"
"sync"

"github.com/hashicorp/raft"
)

// ServerLookup encapsulates looking up servers by id and address
type ServerLookup struct {
lock sync.RWMutex
addressToServer map[raft.ServerAddress]*ServerParts
idToServer map[raft.ServerID]*ServerParts
}

func NewServerLookup() *ServerLookup {
return &ServerLookup{
lock: sync.RWMutex{},
addressToServer: make(map[raft.ServerAddress]*ServerParts),
idToServer: make(map[raft.ServerID]*ServerParts),
}
}

func (sl *ServerLookup) AddServer(server *ServerParts) {
sl.lock.Lock()
defer sl.lock.Unlock()
sl.addressToServer[raft.ServerAddress(server.RPCAddr.String())] = server
sl.idToServer[raft.ServerID(server.ID)] = server
}

func (sl *ServerLookup) RemoveServer(server *ServerParts) {
sl.lock.Lock()
defer sl.lock.Unlock()
delete(sl.addressToServer, raft.ServerAddress(server.RPCAddr.String()))
delete(sl.idToServer, raft.ServerID(server.ID))
}

// Implements the ServerAddressProvider interface
func (sl *ServerLookup) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) {
sl.lock.RLock()
defer sl.lock.RUnlock()
svr, ok := sl.idToServer[id]
if !ok {
return "", fmt.Errorf("Could not find address for server id %v", id)
}
return raft.ServerAddress(svr.RPCAddr.String()), nil
}

// Server looks up the server by address, returns a boolean if not found
func (sl *ServerLookup) Server(addr raft.ServerAddress) *ServerParts {
sl.lock.RLock()
defer sl.lock.RUnlock()
return sl.addressToServer[addr]
}

func (sl *ServerLookup) Servers() []*ServerParts {
sl.lock.RLock()
defer sl.lock.RUnlock()
var ret []*ServerParts
for _, svr := range sl.addressToServer {
ret = append(ret, svr)
}
return ret
}

func (sl *ServerLookup) CheckServers(fn func(srv *ServerParts) bool) {
sl.lock.RLock()
defer sl.lock.RUnlock()

for _, srv := range sl.addressToServer {
if !fn(srv) {
return
}
}
}
91 changes: 91 additions & 0 deletions dkron/server_lookup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package dkron

import (
"testing"

"github.com/hashicorp/raft"
"github.com/stretchr/testify/require"
)

type testAddr struct {
addr string
}

func (ta *testAddr) Network() string {
return "tcp"
}

func (ta *testAddr) String() string {
return ta.addr
}

func TestAddServer(t *testing.T) {
// arrange
lookup := NewServerLookup()
id1, addr1 := "server-1", "127.0.0.1:8300"
id2, addr2 := "server-2", "127.0.0.2:8300"
server1, server2 := buildServerParts(id1, addr1), buildServerParts(id2, addr2)

// act
lookup.AddServer(server1)
lookup.AddServer(server2)

// assert
servers := lookup.Servers()
expectedServers := []*ServerParts{server1, server2}
require.EqualValuesf(t, expectedServers, servers, "Expected %v but got %v", expectedServers, servers)

got, err := lookup.ServerAddr(raft.ServerID(id1))
require.NoErrorf(t, err, "Unexpected error: %v", err)
require.EqualValuesf(t, addr1, string(got), "Expected %v but got %v", addr1, got)

server := lookup.Server(raft.ServerAddress(addr1))
strAddr := server.RPCAddr.String()
require.EqualValuesf(t, addr1, strAddr, "Expected lookup to return address %v but got %v", addr1, strAddr)

got, err = lookup.ServerAddr(raft.ServerID(id2))
require.NoErrorf(t, err, "Unexpected error: %v", err)
require.EqualValuesf(t, addr2, string(got), "Expected %v but got %v", addr2, got)

server = lookup.Server(raft.ServerAddress(addr2))
strAddr = server.RPCAddr.String()
require.EqualValuesf(t, addr2, strAddr, "Expected lookup to return address %v but got %v", addr2, strAddr)
}

func TestRemoveServer(t *testing.T) {
// arrange
lookup := NewServerLookup()
id1, addr1 := "server-1", "127.0.0.1:8300"
id2, addr2 := "server-2", "127.0.0.2:8300"
server1, server2 := buildServerParts(id1, addr1), buildServerParts(id2, addr2)
lookup.AddServer(server1)
lookup.AddServer(server2)

// act
lookup.RemoveServer(server1)

// assert
servers := lookup.Servers()
expectedServers := []*ServerParts{server2}
require.EqualValuesf(t, expectedServers, servers, "Expected %v but got %v", expectedServers, servers)

require.Nilf(t, lookup.Server(raft.ServerAddress(addr1)), "Expected lookup to return nil")
addr, err := lookup.ServerAddr(raft.ServerID(id1))
require.Errorf(t, err, "Expected lookup to return error")
require.EqualValuesf(t, "", string(addr), "Expected empty address but got %v", addr)

got, err := lookup.ServerAddr(raft.ServerID(id2))
require.NoErrorf(t, err, "Unexpected error: %v", err)
require.EqualValuesf(t, addr2, string(got), "Expected %v but got %v", addr2, got)

server := lookup.Server(raft.ServerAddress(addr2))
strAddr := server.RPCAddr.String()
require.EqualValuesf(t, addr2, strAddr, "Expected lookup to return address %v but got %v", addr2, strAddr)
}

func buildServerParts(id, addr string) *ServerParts {
return &ServerParts{
ID: id,
RPCAddr: &testAddr{addr},
}
}
Loading