Skip to content

Commit

Permalink
Fix multitag cardinality bug
Browse files Browse the repository at this point in the history
  • Loading branch information
yvanoers committed Nov 9, 2020
1 parent e805b7a commit 608c02f
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 69 deletions.
133 changes: 72 additions & 61 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/serf"
"github.com/juliangruber/go-intersect"
"github.com/sirupsen/logrus"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
Expand Down Expand Up @@ -125,7 +124,7 @@ type Plugins struct {
// AgentOption type that defines agent options
type AgentOption func(agent *Agent)

// NewAgent return a new Agent instace capable of starting
// NewAgent returns a new Agent instance capable of starting
// and running a Dkron instance.
func NewAgent(config *Config, options ...AgentOption) *Agent {
agent := &Agent{
Expand Down Expand Up @@ -680,8 +679,7 @@ func (a *Agent) join(addrs []string, replay bool) (n int, err error) {
}

func (a *Agent) processFilteredNodes(job *Job) (map[string]string, map[string]string, error) {
// candidates will contain a set of candidates by tags
// the final set of nodes will be the intesection of all groups
// The final set of nodes will be the intersection of all groups
tags := make(map[string]string)

// Actually copy the map
Expand All @@ -693,80 +691,93 @@ func (a *Agent) processFilteredNodes(job *Job) (map[string]string, map[string]st
// on the same region.
tags["region"] = a.config.Region

candidates := [][]string{}
// Make a set of all members
execNodes := make(map[string]serf.Member)
for _, member := range a.serf.Members() {
if member.Status == serf.StatusAlive {
execNodes[member.Name] = member
}
}

execNodes, tags, cardinality, err := filterNodes(execNodes, tags)
if err != nil {
return nil, nil, err
}

// Create an array of node names to aid in computing resulting set based on cardinality
var nameIndex []string
for name := range execNodes {
nameIndex = append(nameIndex, name)
}

nodes := make(map[string]string)
rand.Seed(time.Now().UnixNano())
for ; cardinality > 0; cardinality-- {
// Pick a node, any node
randomIndex := rand.Intn(cardinality)
m := execNodes[nameIndex[randomIndex]]

// Store name and address
if addr, ok := m.Tags["rpc_addr"]; ok {
nodes[m.Name] = addr
} else {
nodes[m.Name] = m.Addr.String()
}

// Swap picked node with the first one and shorten array, so node can't get picked again
nameIndex[randomIndex], nameIndex[0] = nameIndex[0], nameIndex[randomIndex]
nameIndex = nameIndex[1:]
}

return nodes, tags, nil
}

// filterNodes determines which of the execNodes have the given tags
// Out param! The incoming execNodes map is modified.
// Returns:
// * the (modified) map of execNodes
// * a map of tag values without cardinality
// * cardinality, i.e. the max number of nodes that should be targeted, regardless of the
// number of nodes in the resulting map.
// * an error if a cardinality was malformed
func filterNodes(execNodes map[string]serf.Member, tags map[string]string) (map[string]serf.Member, map[string]string, int, error) {
cardinality := int(^uint(0) >> 1) // MaxInt

cleanTags := make(map[string]string)

// Filter nodes that lack tags
// Determine lowest cardinality along the way
for jtk, jtv := range tags {
cans := []string{}
tc := strings.Split(jtv, ":")

tv := tc[0]

// Set original tag to clean tag
tags[jtk] = tv

for _, member := range a.serf.Members() {
if member.Status == serf.StatusAlive {
for mtk, mtv := range member.Tags {
if mtk == jtk && mtv == tv {
cans = append(cans, member.Name)
}
}
cleanTags[jtk] = tv

// Remove nodes that do not have the selected tags
for name, member := range execNodes {
if mtv, tagPresent := member.Tags[jtk]; !tagPresent || mtv != tv {
delete(execNodes, name)
}
}

// In case there is cardinality in the tag, randomize the order and select the amount of nodes
// or else just add all nodes to the result.
if len(tc) == 2 {
f := []string{}
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(cans), func(i, j int) {
cans[i], cans[j] = cans[j], cans[i]
})

count, err := strconv.Atoi(tc[1])
tagCardinality, err := strconv.Atoi(tc[1])
if err != nil {
return nil, nil, err
return nil, nil, 0, err
}
for i := 1; i <= count; i++ {
if len(cans) == 0 {
break
}
f = append(f, cans[0])
cans = cans[1:]
if tagCardinality < cardinality {
cardinality = tagCardinality
}
cans = f
}

candidates = append(candidates, cans)
}

// The final result will be the intersection of all candidates.
nodes := make(map[string]string)
r := candidates[0]
for i := 1; i <= len(candidates)-1; i++ {
isec := intersect.Simple(r, candidates[i]).([]interface{})
// Empty the slice
r = []string{}

// Refill with the intersection
for _, v := range isec {
r = append(r, v.(string))
}
}

for _, n := range r {
for _, m := range a.serf.Members() {
if n == m.Name {
// If the server is missing the rpc_addr tag, default to the serf advertise addr
if addr, ok := m.Tags["rpc_addr"]; ok {
nodes[n] = addr
} else {
nodes[n] = m.Addr.String()
}
}
}
// limit the cardinality to the number of possible nodes
if len(execNodes) < cardinality {
cardinality = len(execNodes)
}

return nodes, tags, nil
return execNodes, cleanTags, cardinality, nil
}

// This function is called when a client request the RPCAddress
Expand Down
51 changes: 43 additions & 8 deletions dkron/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,10 @@ func Test_processFilteredNodes(t *testing.T) {
c.Server = true
c.LogLevel = logLevel
c.Tags = map[string]string{
"tag": "test",
"region": "global",
"tag": "test",
"region": "global",
"additional": "value",
"additional2": "value2",
}
c.DevMode = true
c.DataDir = dir
Expand All @@ -140,9 +142,11 @@ func Test_processFilteredNodes(t *testing.T) {
c.Server = true
c.LogLevel = logLevel
c.Tags = map[string]string{
"tag": "test",
"extra": "tag",
"region": "global",
"tag": "test",
"extra": "tag",
"region": "global",
"additional": "value",
"additional2": "value2",
}
c.DevMode = true
c.DataDir = dir
Expand All @@ -162,9 +166,11 @@ func Test_processFilteredNodes(t *testing.T) {
c.Server = false
c.LogLevel = logLevel
c.Tags = map[string]string{
"tag": "test_client",
"extra": "tag",
"region": "global",
"tag": "test_client",
"extra": "tag",
"region": "global",
"additional": "value",
"additional2": "value2",
}
c.DevMode = true
c.DataDir = dir
Expand All @@ -174,6 +180,7 @@ func Test_processFilteredNodes(t *testing.T) {

time.Sleep(2 * time.Second)

// Test cardinality of 2 returns correct nodes
job := &Job{
Name: "test_job_1",
Tags: map[string]string{
Expand All @@ -189,6 +196,7 @@ func Test_processFilteredNodes(t *testing.T) {
assert.Len(t, nodes, 2)
assert.Equal(t, tags["tag"], "test")

// Test cardinality of 1 with two qualified nodes returns 1 node
job2 := &Job{
Name: "test_job_2",
Tags: map[string]string{
Expand All @@ -201,6 +209,7 @@ func Test_processFilteredNodes(t *testing.T) {

assert.Len(t, nodes, 1)

// Test no cardinality specified, all nodes returned
job3 := &Job{
Name: "test_job_3",
}
Expand All @@ -213,6 +222,7 @@ func Test_processFilteredNodes(t *testing.T) {
assert.Contains(t, nodes, "test2")
assert.Contains(t, nodes, "test3")

// Test exclusive tag returns correct node
job4 := &Job{
Name: "test_job_4",
Tags: map[string]string{
Expand All @@ -226,6 +236,7 @@ func Test_processFilteredNodes(t *testing.T) {
assert.Len(t, nodes, 1)
assert.Contains(t, nodes, "test3")

// Test existing tag but no matching value returns no nodes
job5 := &Job{
Name: "test_job_5",
Tags: map[string]string{
Expand All @@ -238,6 +249,7 @@ func Test_processFilteredNodes(t *testing.T) {

assert.Len(t, nodes, 0)

// Test 1 matching and 1 not matching tag returns no nodes
job6 := &Job{
Name: "test_job_6",
Tags: map[string]string{
Expand All @@ -252,6 +264,7 @@ func Test_processFilteredNodes(t *testing.T) {
assert.Len(t, nodes, 0)
assert.Equal(t, tags["tag"], "test")

// Test matching tags with cardinality of 2 but only 1 matching node returns correct node
job7 := &Job{
Name: "test_job_7",
Tags: map[string]string{
Expand All @@ -268,6 +281,28 @@ func Test_processFilteredNodes(t *testing.T) {
assert.Equal(t, tags["tag"], "test")
assert.Equal(t, tags["extra"], "tag")

// Test two tags matching same 3 servers and cardinality of 1 should always return 1 server

// Do this 10 times: an old bug caused this to sometimes succeed and sometimes fail due to the use of math.rand
// Statistically, with 10 tries about 3 should succeed and the rest should fail, if the code is buggy.
for i := 0; i < 10; i++ {
job8 := &Job{
Name: "test_job_7",
Tags: map[string]string{
"additional": "value:1",
"additional2": "value2:1",
},
}

nodes, tags, err = a1.processFilteredNodes(job8)
require.NoError(t, err)

assert.Len(t, nodes, 1)
assert.Equal(t, tags["additional"], "value")
assert.Equal(t, tags["additional2"], "value2")
}

// Clean up
a1.Stop()
a2.Stop()
a3.Stop()
Expand Down

0 comments on commit 608c02f

Please sign in to comment.