Skip to content

Commit

Permalink
metropolis: move curator client watches to curator/watcher
Browse files Browse the repository at this point in the history
This replaces all the ad-hoc code to watch Curator node(s) with calls
through the new curator/watcher library.

Change-Id: Ie2a82b330e4108b9b725515cb10595916c38b323
Reviewed-on: https://review.monogon.dev/c/monogon/+/2263
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
  • Loading branch information
q3k committed Nov 13, 2023
1 parent 3fd0977 commit 60461b2
Show file tree
Hide file tree
Showing 18 changed files with 374 additions and 639 deletions.
6 changes: 2 additions & 4 deletions metropolis/node/core/clusternet/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
deps = [
"//metropolis/node",
"//metropolis/node/core/curator/proto/api",
"//metropolis/node/core/curator/watcher",
"//metropolis/node/core/localstorage",
"//metropolis/node/core/network",
"//metropolis/pkg/event",
Expand All @@ -27,10 +28,7 @@ go_library(

go_test(
name = "clusternet_test",
srcs = [
"clusternet_test.go",
"types_test.go",
],
srcs = ["clusternet_test.go"],
embed = [":clusternet"],
deps = [
"//metropolis/node",
Expand Down
91 changes: 40 additions & 51 deletions metropolis/node/core/clusternet/clusternet.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import (
"fmt"
"net"
"net/netip"
"slices"

"github.com/cenkalti/backoff/v4"

"source.monogon.dev/metropolis/node/core/curator/watcher"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/pkg/event"
Expand Down Expand Up @@ -176,58 +178,45 @@ func (s *Service) push(ctx context.Context, kubeC chan *Prefixes, netC chan *net
func (s *Service) pull(ctx context.Context) error {
supervisor.Signal(ctx, supervisor.SignalHealthy)

srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
Kind: &apb.WatchRequest_NodesInCluster_{
NodesInCluster: &apb.WatchRequest_NodesInCluster{},
var batch []*apb.Node
return watcher.WatchNodes(ctx, s.Curator, watcher.SimpleFollower{
FilterFn: func(a *apb.Node) bool {
if a.Clusternet == nil {
return false
}
if a.Clusternet.WireguardPubkey == "" {
return false
}
return true
},
})
if err != nil {
return fmt.Errorf("curator watch failed: %w", err)
}
defer srv.CloseSend()

nodes := newNodemap()
for {
ev, err := srv.Recv()
if err != nil {
return fmt.Errorf("curator watch recv failed: %w", err)
}

updated, removed := nodes.update(ctx, ev)

for _, n := range removed {
supervisor.Logger(ctx).Infof("Node %s removed, unconfiguring", n.id)
if err := s.wg.unconfigurePeer(n.copy()); err != nil {
// Do nothing and hope whatever caused this will go away at some point.
supervisor.Logger(ctx).Errorf("Node %s couldn't be unconfigured: %v", n.id, err)
EqualsFn: func(a *apb.Node, b *apb.Node) bool {
if a.Status.ExternalAddress != b.Status.ExternalAddress {
return false
}
}
var newNodes []*node
for _, n := range updated {
newNodes = append(newNodes, n.copy())
supervisor.Logger(ctx).Infof("Node %s updated: pk %s, address %s, prefixes %v", n.id, n.pubkey, n.address, n.prefixes)
}
succeeded := 0
if err := s.wg.configurePeers(newNodes); err != nil {
// If configuring all nodes at once failed, go node-by-node to make sure we've
// done as much as possible.
supervisor.Logger(ctx).Warningf("Bulk node update call failed, trying node-by-node..: %v", err)
for _, n := range newNodes {
if err := s.wg.configurePeers([]*node{n}); err != nil {
supervisor.Logger(ctx).Errorf("Node %s failed: %v", n.id, err)
} else {
succeeded += 1
}
if a.Clusternet.WireguardPubkey != b.Clusternet.WireguardPubkey {
return false
}
} else {
succeeded = len(newNodes)
}

if len(newNodes) != 0 {
supervisor.Logger(ctx).Infof("Successfully updated %d out of %d nodes", succeeded, len(newNodes))

numNodes, numPrefixes := nodes.stats()
supervisor.Logger(ctx).Infof("Total: %d nodes, %d prefixes.", numNodes, numPrefixes)
}
}
if !slices.Equal(a.Clusternet.Prefixes, b.Clusternet.Prefixes) {
return false
}
return true
},
OnNewUpdated: func(new *apb.Node) error {
batch = append(batch, new)
return nil
},
OnBatchDone: func() error {
if err := s.wg.configurePeers(batch); err != nil {
supervisor.Logger(ctx).Errorf("nodes couldn't be configured: %v", err)
}
batch = nil
return nil
},
OnDeleted: func(prev *apb.Node) error {
if err := s.wg.unconfigurePeer(prev); err != nil {
supervisor.Logger(ctx).Errorf("Node %s couldn't be unconfigured: %v", prev.Id, err)
}
return nil
},
})
}
Loading

0 comments on commit 60461b2

Please sign in to comment.