Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix intermittent test false negative #982

Merged
merged 17 commits into from
Jun 16, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ doc:
test:
@bash --norc -i ./scripts/test

localtest:
go test -v ./... | sed ''/PASS/s//$$(printf "\033[32mPASS\033[0m")/'' | sed ''/FAIL/s//$$(printf "\033[31mFAIL\033[0m")/''

updatetestcert:
wget https://badssl.com/certs/badssl.com-client.p12 -q -O badssl.com-client.p12
openssl pkcs12 -in badssl.com-client.p12 -nocerts -nodes -passin pass:badssl.com -out builtin/bins/dkron-executor-http/testdata/badssl.com-client-key-decrypted.pem
Expand Down
8 changes: 1 addition & 7 deletions cmd/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,8 @@ WAIT:
goto WAIT
}

// Check if we should do a graceful leave
graceful := false
if sig == syscall.SIGTERM || sig == os.Interrupt {
graceful = true
}

// Fail fast if not doing a graceful leave
if !graceful {
if sig != syscall.SIGTERM && sig != os.Interrupt {
vcastellm marked this conversation as resolved.
Show resolved Hide resolved
return 1
}

Expand Down
137 changes: 42 additions & 95 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ var (
runningExecutions sync.Map
)

// Node is a shorter, more descriptive name for serf.Member
type Node = serf.Member

// Agent is the main struct that represents a dkron agent
type Agent struct {
// ProcessorPlugins maps processor plugins
Expand Down Expand Up @@ -591,33 +594,6 @@ func (a *Agent) IsLeader() bool {
return a.raft.State() == raft.Leader
}

// Members is used to return the members of the serf cluster
func (a *Agent) Members() []serf.Member {
return a.serf.Members()
}

// LocalMember is used to return the local node
func (a *Agent) LocalMember() serf.Member {
return a.serf.LocalMember()
}

// Leader is used to return the Raft leader
func (a *Agent) Leader() raft.ServerAddress {
yvanoers marked this conversation as resolved.
Show resolved Hide resolved
return a.raft.Leader()
}

// Servers returns a list of known server
func (a *Agent) Servers() (members []*ServerParts) {
yvanoers marked this conversation as resolved.
Show resolved Hide resolved
for _, member := range a.serf.Members() {
ok, parts := isServer(member)
if !ok || member.Status != serf.StatusAlive {
continue
}
members = append(members, parts)
}
return members
}

// LocalServers returns a list of the local known server
func (a *Agent) LocalServers() (members []*ServerParts) {
for _, member := range a.serf.Members() {
Expand Down Expand Up @@ -692,87 +668,58 @@ func (a *Agent) join(addrs []string, replay bool) (n int, err error) {
return
}

func (a *Agent) processFilteredNodes(job *Job) (map[string]string, map[string]string, error) {
// The final set of nodes will be the intersection of all groups
tags := make(map[string]string)

// Actually copy the map
for key, val := range job.Tags {
tags[key] = val
}

// Always filter by region tag as we currently only target nodes
// on the same region.
tags["region"] = a.config.Region
func (a *Agent) getTargetNodes(tags map[string]string, selectFunc func([]Node) int) []Node {
bareTags, cardinality := cleanTags(tags, a.logger)
nodes := a.getQualifyingNodes(a.serf.Members(), bareTags)
return selectNodes(nodes, cardinality, selectFunc)
}

// Make a set of all members
allNodes := make(map[string]serf.Member)
for _, member := range a.serf.Members() {
if member.Status == serf.StatusAlive {
allNodes[member.Name] = member
}
}
// getQualifyingNodes returns all nodes in the cluster that are
// alive, in this agent's region and have all given tags
func (a *Agent) getQualifyingNodes(nodes []Node, bareTags map[string]string) []Node {
// Determine the usable set of nodes
qualifiers := filterArray(nodes, func(node Node) bool {
return node.Status == serf.StatusAlive &&
node.Tags["region"] == a.config.Region &&
nodeMatchesTags(node, bareTags)
})
return qualifiers
}

execNodes, tags, cardinality, err := filterNodes(allNodes, tags)
if err != nil {
return nil, nil, err
}
// The default selector function for getTargetNodes/selectNodes
func defaultSelector(nodes []Node) int {
return rand.Intn(len(nodes))
}

// Create an array of node names to aid in computing resulting set based on cardinality
var names []string
for name := range execNodes {
names = append(names, name)
// selectNodes selects at most #cardinality from the given nodes using the selectFunc
func selectNodes(nodes []Node, cardinality int, selectFunc func([]Node) int) []Node {
// Return all nodes immediately if they're all going to be selected
numNodes := len(nodes)
if numNodes <= cardinality {
return nodes
}

nodes := make(map[string]string)
for ; cardinality > 0; cardinality-- {
// Pick a node, any node
randomIndex := rand.Intn(len(names))
m := execNodes[names[randomIndex]]
// Select a node
chosenIndex := selectFunc(nodes[:numNodes])

// Store name and address
if addr, ok := m.Tags["rpc_addr"]; ok {
nodes[m.Name] = addr
} else {
nodes[m.Name] = m.Addr.String()
}

// Swap picked node with the first one and shorten array, so node can't get picked again
names[randomIndex], names[0] = names[0], names[randomIndex]
names = names[1:]
// Swap picked node with the last one and reduce choices so it can't get picked again
nodes[numNodes-1], nodes[chosenIndex] = nodes[chosenIndex], nodes[numNodes-1]
numNodes--
}

return nodes, tags, nil
return nodes[numNodes:]
}

// filterNodes determines which of the provided nodes have the given tags
// Returns:
// * the map of allNodes that match the provided tags
// * a clean map of tag values without cardinality
// * cardinality, i.e. the max number of nodes that should be targeted, regardless of the
// number of nodes in the resulting map.
// * an error if a cardinality was malformed
func filterNodes(allNodes map[string]serf.Member, tags map[string]string) (map[string]serf.Member, map[string]string, int, error) {
ct, cardinality, err := cleanTags(tags)
if err != nil {
return nil, nil, 0, err
}

matchingNodes := make(map[string]serf.Member)

// Filter nodes that lack tags
for name, member := range allNodes {
if nodeMatchesTags(member, ct) {
matchingNodes[name] = member
// Returns all items from an array for which filterFunc returns true,
func filterArray(arr []Node, filterFunc func(Node) bool) []Node {
for i := len(arr) - 1; i >= 0; i-- {
if !filterFunc(arr[i]) {
arr[i] = arr[len(arr)-1]
arr = arr[:len(arr)-1]
}
}

// limit the cardinality to the number of possible nodes
if len(matchingNodes) < cardinality {
cardinality = len(matchingNodes)
}

return matchingNodes, ct, cardinality, nil
return arr
}

// This function is called when a client request the RPCAddress
Expand Down
Loading