diff --git a/dkron/agent.go b/dkron/agent.go index 10eb440d9..d95339c17 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -22,7 +22,6 @@ import ( "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" "github.com/hashicorp/serf/serf" - "github.com/juliangruber/go-intersect" "github.com/sirupsen/logrus" "github.com/soheilhy/cmux" "google.golang.org/grpc" @@ -125,7 +124,7 @@ type Plugins struct { // AgentOption type that defines agent options type AgentOption func(agent *Agent) -// NewAgent return a new Agent instace capable of starting +// NewAgent returns a new Agent instance capable of starting // and running a Dkron instance. func NewAgent(config *Config, options ...AgentOption) *Agent { agent := &Agent{ @@ -680,8 +679,7 @@ func (a *Agent) join(addrs []string, replay bool) (n int, err error) { } func (a *Agent) processFilteredNodes(job *Job) (map[string]string, map[string]string, error) { - // candidates will contain a set of candidates by tags - // the final set of nodes will be the intesection of all groups + // The final set of nodes will be the intersection of all groups tags := make(map[string]string) // Actually copy the map @@ -693,80 +691,93 @@ func (a *Agent) processFilteredNodes(job *Job) (map[string]string, map[string]st // on the same region. tags["region"] = a.config.Region - candidates := [][]string{} + // Make a set of all members + execNodes := make(map[string]serf.Member) + for _, member := range a.serf.Members() { + if member.Status == serf.StatusAlive { + execNodes[member.Name] = member + } + } + + execNodes, tags, cardinality, err := filterNodes(execNodes, tags) + if err != nil { + return nil, nil, err + } + + // Create an array of node names to aid in computing resulting set based on cardinality + var nameIndex []string + for name := range execNodes { + nameIndex = append(nameIndex, name) + } + + nodes := make(map[string]string) + rand.Seed(time.Now().UnixNano()) + for ; cardinality > 0; cardinality-- { + // Pick a node, any node + randomIndex := rand.Intn(cardinality) + m := execNodes[nameIndex[randomIndex]] + + // Store name and address + if addr, ok := m.Tags["rpc_addr"]; ok { + nodes[m.Name] = addr + } else { + nodes[m.Name] = m.Addr.String() + } + + // Swap picked node with the first one and shorten array, so node can't get picked again + nameIndex[randomIndex], nameIndex[0] = nameIndex[0], nameIndex[randomIndex] + nameIndex = nameIndex[1:] + } + + return nodes, tags, nil +} + +// filterNodes determines which of the execNodes have the given tags +// Out param! The incoming execNodes map is modified. +// Returns: +// * the (modified) map of execNodes +// * a map of tag values without cardinality +// * cardinality, i.e. the max number of nodes that should be targeted, regardless of the +// number of nodes in the resulting map. +// * an error if a cardinality was malformed +func filterNodes(execNodes map[string]serf.Member, tags map[string]string) (map[string]serf.Member, map[string]string, int, error) { + cardinality := int(^uint(0) >> 1) // MaxInt + + cleanTags := make(map[string]string) + + // Filter nodes that lack tags + // Determine lowest cardinality along the way for jtk, jtv := range tags { - cans := []string{} tc := strings.Split(jtv, ":") - tv := tc[0] // Set original tag to clean tag - tags[jtk] = tv - - for _, member := range a.serf.Members() { - if member.Status == serf.StatusAlive { - for mtk, mtv := range member.Tags { - if mtk == jtk && mtv == tv { - cans = append(cans, member.Name) - } - } + cleanTags[jtk] = tv + + // Remove nodes that do not have the selected tags + for name, member := range execNodes { + if mtv, tagPresent := member.Tags[jtk]; !tagPresent || mtv != tv { + delete(execNodes, name) } } - // In case there is cardinality in the tag, randomize the order and select the amount of nodes - // or else just add all nodes to the result. if len(tc) == 2 { - f := []string{} - rand.Seed(time.Now().UnixNano()) - rand.Shuffle(len(cans), func(i, j int) { - cans[i], cans[j] = cans[j], cans[i] - }) - - count, err := strconv.Atoi(tc[1]) + tagCardinality, err := strconv.Atoi(tc[1]) if err != nil { - return nil, nil, err + return nil, nil, 0, err } - for i := 1; i <= count; i++ { - if len(cans) == 0 { - break - } - f = append(f, cans[0]) - cans = cans[1:] + if tagCardinality < cardinality { + cardinality = tagCardinality } - cans = f - } - - candidates = append(candidates, cans) - } - - // The final result will be the intersection of all candidates. - nodes := make(map[string]string) - r := candidates[0] - for i := 1; i <= len(candidates)-1; i++ { - isec := intersect.Simple(r, candidates[i]).([]interface{}) - // Empty the slice - r = []string{} - - // Refill with the intersection - for _, v := range isec { - r = append(r, v.(string)) } } - for _, n := range r { - for _, m := range a.serf.Members() { - if n == m.Name { - // If the server is missing the rpc_addr tag, default to the serf advertise addr - if addr, ok := m.Tags["rpc_addr"]; ok { - nodes[n] = addr - } else { - nodes[n] = m.Addr.String() - } - } - } + // limit the cardinality to the number of possible nodes + if len(execNodes) < cardinality { + cardinality = len(execNodes) } - return nodes, tags, nil + return execNodes, cleanTags, cardinality, nil } // This function is called when a client request the RPCAddress diff --git a/dkron/agent_test.go b/dkron/agent_test.go index 400888a92..48413b6b8 100644 --- a/dkron/agent_test.go +++ b/dkron/agent_test.go @@ -121,8 +121,10 @@ func Test_processFilteredNodes(t *testing.T) { c.Server = true c.LogLevel = logLevel c.Tags = map[string]string{ - "tag": "test", - "region": "global", + "tag": "test", + "region": "global", + "additional": "value", + "additional2": "value2", } c.DevMode = true c.DataDir = dir @@ -140,9 +142,11 @@ func Test_processFilteredNodes(t *testing.T) { c.Server = true c.LogLevel = logLevel c.Tags = map[string]string{ - "tag": "test", - "extra": "tag", - "region": "global", + "tag": "test", + "extra": "tag", + "region": "global", + "additional": "value", + "additional2": "value2", } c.DevMode = true c.DataDir = dir @@ -162,9 +166,11 @@ func Test_processFilteredNodes(t *testing.T) { c.Server = false c.LogLevel = logLevel c.Tags = map[string]string{ - "tag": "test_client", - "extra": "tag", - "region": "global", + "tag": "test_client", + "extra": "tag", + "region": "global", + "additional": "value", + "additional2": "value2", } c.DevMode = true c.DataDir = dir @@ -174,6 +180,7 @@ func Test_processFilteredNodes(t *testing.T) { time.Sleep(2 * time.Second) + // Test cardinality of 2 returns correct nodes job := &Job{ Name: "test_job_1", Tags: map[string]string{ @@ -189,6 +196,7 @@ func Test_processFilteredNodes(t *testing.T) { assert.Len(t, nodes, 2) assert.Equal(t, tags["tag"], "test") + // Test cardinality of 1 with two qualified nodes returns 1 node job2 := &Job{ Name: "test_job_2", Tags: map[string]string{ @@ -201,6 +209,7 @@ func Test_processFilteredNodes(t *testing.T) { assert.Len(t, nodes, 1) + // Test no cardinality specified, all nodes returned job3 := &Job{ Name: "test_job_3", } @@ -213,6 +222,7 @@ func Test_processFilteredNodes(t *testing.T) { assert.Contains(t, nodes, "test2") assert.Contains(t, nodes, "test3") + // Test exclusive tag returns correct node job4 := &Job{ Name: "test_job_4", Tags: map[string]string{ @@ -226,6 +236,7 @@ func Test_processFilteredNodes(t *testing.T) { assert.Len(t, nodes, 1) assert.Contains(t, nodes, "test3") + // Test existing tag but no matching value returns no nodes job5 := &Job{ Name: "test_job_5", Tags: map[string]string{ @@ -238,6 +249,7 @@ func Test_processFilteredNodes(t *testing.T) { assert.Len(t, nodes, 0) + // Test 1 matching and 1 not matching tag returns no nodes job6 := &Job{ Name: "test_job_6", Tags: map[string]string{ @@ -252,6 +264,7 @@ func Test_processFilteredNodes(t *testing.T) { assert.Len(t, nodes, 0) assert.Equal(t, tags["tag"], "test") + // Test matching tags with cardinality of 2 but only 1 matching node returns correct node job7 := &Job{ Name: "test_job_7", Tags: map[string]string{ @@ -268,6 +281,28 @@ func Test_processFilteredNodes(t *testing.T) { assert.Equal(t, tags["tag"], "test") assert.Equal(t, tags["extra"], "tag") + // Test two tags matching same 3 servers and cardinality of 1 should always return 1 server + + // Do this 10 times: an old bug caused this to sometimes succeed and sometimes fail due to the use of math.rand + // Statistically, with 10 tries about 3 should succeed and the rest should fail, if the code is buggy. + for i := 0; i < 10; i++ { + job8 := &Job{ + Name: "test_job_7", + Tags: map[string]string{ + "additional": "value:1", + "additional2": "value2:1", + }, + } + + nodes, tags, err = a1.processFilteredNodes(job8) + require.NoError(t, err) + + assert.Len(t, nodes, 1) + assert.Equal(t, tags["additional"], "value") + assert.Equal(t, tags["additional2"], "value2") + } + + // Clean up a1.Stop() a2.Stop() a3.Stop()