From 2a74f066e3967b100763e4e6f6da35bace868f02 Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Wed, 20 May 2020 10:13:52 -0700 Subject: [PATCH] Wait for caches to sync before running node sync There can be a race condition where the queue somehow triggers before the caches are able to sync. Improve logging around the event for better debugging in the future. --- pkg/controller/node.go | 16 +++++++++++++++- pkg/instances/instances.go | 22 ++++++++++++++++------ 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/pkg/controller/node.go b/pkg/controller/node.go index 77b80a471a..52b4a05a3f 100644 --- a/pkg/controller/node.go +++ b/pkg/controller/node.go @@ -17,12 +17,15 @@ limitations under the License. package controller import ( + "time" + apiv1 "k8s.io/api/core/v1" listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/instances" "k8s.io/ingress-gce/pkg/utils" + "k8s.io/klog" ) // NodeController synchronizes the state of the nodes to the unmanaged instance @@ -34,6 +37,9 @@ type NodeController struct { queue utils.TaskQueue // instancePool is a NodePool to manage kubernetes nodes. instancePool instances.NodePool + // hasSynced returns true if relevant caches have done their initial + // synchronization. + hasSynced func() bool } // NewNodeController returns a new node update controller. @@ -41,6 +47,7 @@ func NewNodeController(ctx *context.ControllerContext, instancePool instances.No c := &NodeController{ lister: ctx.NodeInformer.GetIndexer(), instancePool: instancePool, + hasSynced: ctx.HasSynced, } c.queue = utils.NewPeriodicTaskQueue("", "nodes", c.sync) @@ -60,8 +67,15 @@ func NewNodeController(ctx *context.ControllerContext, instancePool instances.No return c } -// Run a goroutine to process updates for the controller. +// Run the queue to process updates for the controller. This must be run in a +// separate goroutine (method will block until queue shutdown). func (c *NodeController) Run() { + start := time.Now() + for !c.hasSynced() { + klog.V(2).Infof("Waiting for hasSynced (%s elapsed)", time.Now().Sub(start)) + time.Sleep(1 * time.Second) + } + klog.V(2).Infof("Caches synced (took %s)", time.Now().Sub(start)) c.queue.Run() } diff --git a/pkg/instances/instances.go b/pkg/instances/instances.go index aaf7e30e0e..324aba0ec1 100644 --- a/pkg/instances/instances.go +++ b/pkg/instances/instances.go @@ -19,6 +19,7 @@ package instances import ( "fmt" "net/http" + "time" "k8s.io/ingress-gce/pkg/utils/namer" "k8s.io/klog" @@ -277,9 +278,9 @@ func (i *Instances) Remove(groupName string, names []string) error { return fmt.Errorf("%v", errs) } -// Sync syncs kubernetes instances with the instances in the instance group. +// Sync nodes with the instances in the instance group. func (i *Instances) Sync(nodes []string) (err error) { - klog.V(4).Infof("Syncing nodes %v", nodes) + klog.V(2).Infof("Syncing nodes %v", nodes) defer func() { // The node pool is only responsible for syncing nodes to instance @@ -296,6 +297,7 @@ func (i *Instances) Sync(nodes []string) (err error) { pool, err := i.List() if err != nil { + klog.Errorf("List error: %v", err) return err } @@ -303,6 +305,7 @@ func (i *Instances) Sync(nodes []string) (err error) { gceNodes := sets.NewString() gceNodes, err = i.list(igName) if err != nil { + klog.Errorf("list(%q) error: %v", igName, err) return err } kubeNodes := sets.NewString(nodes...) @@ -313,16 +316,23 @@ func (i *Instances) Sync(nodes []string) (err error) { removeNodes := gceNodes.Difference(kubeNodes).List() addNodes := kubeNodes.Difference(gceNodes).List() + + klog.V(2).Infof("Removing %d, adding %d nodes", len(removeNodes), len(addNodes)) + + start := time.Now() if len(removeNodes) != 0 { - klog.V(4).Infof("Removing nodes from IG: %v", removeNodes) - if err = i.Remove(igName, removeNodes); err != nil { + err = i.Remove(igName, removeNodes) + klog.V(2).Infof("Remove(%q, _) = %v (took %s); nodes = %v", igName, err, time.Now().Sub(start), removeNodes) + if err != nil { return err } } + start = time.Now() if len(addNodes) != 0 { - klog.V(4).Infof("Adding nodes to IG: %v", addNodes) - if err = i.Add(igName, addNodes); err != nil { + err = i.Add(igName, addNodes) + klog.V(2).Infof("Add(%q, _) = %v (took %s); nodes = %v", igName, err, time.Now().Sub(start), addNodes) + if err != nil { return err } }