Skip to content

Commit

Permalink
Merge pull request #1114 from spencerhance/cp-1107-1-9
Browse files Browse the repository at this point in the history
Cherry Pick #1107 [Wait for caches to sync before running node sync] to release-1.9
  • Loading branch information
k8s-ci-robot committed May 26, 2020
2 parents 4185a9a + 2a74f06 commit 9f26d00
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 7 deletions.
16 changes: 15 additions & 1 deletion pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ limitations under the License.
package controller

import (
"time"

apiv1 "k8s.io/api/core/v1"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/instances"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog"
)

// NodeController synchronizes the state of the nodes to the unmanaged instance
Expand All @@ -34,13 +37,17 @@ type NodeController struct {
queue utils.TaskQueue
// instancePool is a NodePool to manage kubernetes nodes.
instancePool instances.NodePool
// hasSynced returns true if relevant caches have done their initial
// synchronization.
hasSynced func() bool
}

// NewNodeController returns a new node update controller.
func NewNodeController(ctx *context.ControllerContext, instancePool instances.NodePool) *NodeController {
c := &NodeController{
lister: ctx.NodeInformer.GetIndexer(),
instancePool: instancePool,
hasSynced: ctx.HasSynced,
}
c.queue = utils.NewPeriodicTaskQueue("", "nodes", c.sync)

Expand All @@ -60,8 +67,15 @@ func NewNodeController(ctx *context.ControllerContext, instancePool instances.No
return c
}

// Run a goroutine to process updates for the controller.
// Run the queue to process updates for the controller. This must be run in a
// separate goroutine (method will block until queue shutdown).
func (c *NodeController) Run() {
start := time.Now()
for !c.hasSynced() {
klog.V(2).Infof("Waiting for hasSynced (%s elapsed)", time.Now().Sub(start))
time.Sleep(1 * time.Second)
}
klog.V(2).Infof("Caches synced (took %s)", time.Now().Sub(start))
c.queue.Run()
}

Expand Down
22 changes: 16 additions & 6 deletions pkg/instances/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package instances
import (
"fmt"
"net/http"
"time"

"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/klog"
Expand Down Expand Up @@ -277,9 +278,9 @@ func (i *Instances) Remove(groupName string, names []string) error {
return fmt.Errorf("%v", errs)
}

// Sync syncs kubernetes instances with the instances in the instance group.
// Sync nodes with the instances in the instance group.
func (i *Instances) Sync(nodes []string) (err error) {
klog.V(4).Infof("Syncing nodes %v", nodes)
klog.V(2).Infof("Syncing nodes %v", nodes)

defer func() {
// The node pool is only responsible for syncing nodes to instance
Expand All @@ -296,13 +297,15 @@ func (i *Instances) Sync(nodes []string) (err error) {

pool, err := i.List()
if err != nil {
klog.Errorf("List error: %v", err)
return err
}

for _, igName := range pool {
gceNodes := sets.NewString()
gceNodes, err = i.list(igName)
if err != nil {
klog.Errorf("list(%q) error: %v", igName, err)
return err
}
kubeNodes := sets.NewString(nodes...)
Expand All @@ -313,16 +316,23 @@ func (i *Instances) Sync(nodes []string) (err error) {

removeNodes := gceNodes.Difference(kubeNodes).List()
addNodes := kubeNodes.Difference(gceNodes).List()

klog.V(2).Infof("Removing %d, adding %d nodes", len(removeNodes), len(addNodes))

start := time.Now()
if len(removeNodes) != 0 {
klog.V(4).Infof("Removing nodes from IG: %v", removeNodes)
if err = i.Remove(igName, removeNodes); err != nil {
err = i.Remove(igName, removeNodes)
klog.V(2).Infof("Remove(%q, _) = %v (took %s); nodes = %v", igName, err, time.Now().Sub(start), removeNodes)
if err != nil {
return err
}
}

start = time.Now()
if len(addNodes) != 0 {
klog.V(4).Infof("Adding nodes to IG: %v", addNodes)
if err = i.Add(igName, addNodes); err != nil {
err = i.Add(igName, addNodes)
klog.V(2).Infof("Add(%q, _) = %v (took %s); nodes = %v", igName, err, time.Now().Sub(start), addNodes)
if err != nil {
return err
}
}
Expand Down

0 comments on commit 9f26d00

Please sign in to comment.