Skip to content

Commit

Permalink
remove loop from flush(). start/stop informers depending on k8s master
Browse files Browse the repository at this point in the history
  • Loading branch information
agau4779 committed Dec 13, 2018
1 parent fa7203e commit 3240156
Showing 1 changed file with 57 additions and 37 deletions.
94 changes: 57 additions & 37 deletions pkg/e2e/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ func (sm *StatusManager) startInformer() {
cmInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, cur interface{}) {
curCm := cur.(*v1.ConfigMap)
upgradeTs := curCm.Data[masterUpgradingKey]
// We need to pick up the master-upgrading flag here, because otherwise
// if the k8s master is in the process of upgrading we can't access
// the ConfigMap.
if len(upgradeTs) > 0 {
glog.V(2).Infof("Master upgrade began at %v", upgradeTs)
sm.setMasterUpgrading(upgradeTs)
}

if len(curCm.Data[exitKey]) > 0 {
glog.V(2).Infof("ConfigMap was updated with exit switch at %s", curCm.Data[exitKey])
close(sm.informerCh)
Expand All @@ -124,13 +133,13 @@ func (sm *StatusManager) startInformer() {
},
})

glog.V(4).Info("Started ConfigMap informer")
glog.V(2).Info("Started ConfigMap informer")
sm.informerRunning = true
go cmInformer.Run(sm.informerCh)
}

func (sm *StatusManager) stopInformer() {
glog.V(4).Info("Stopped ConfigMap informer")
glog.V(2).Info("Stopped ConfigMap informer")
sm.informerRunning = false
close(sm.informerCh)
}
Expand All @@ -152,66 +161,77 @@ func (sm *StatusManager) putStatus(key string, status IngressStability) {
sm.cm.Data[key] = string(status)
}

func (sm *StatusManager) setMasterUpgrading(ts string) {
sm.f.lock.Lock()
defer sm.f.lock.Unlock()
if sm.cm.Data == nil {
sm.cm.Data = make(map[string]string)
}

sm.cm.Data[masterUpgradingKey] = ts
}

func (sm *StatusManager) masterUpgrading() bool {
return len(sm.cm.Data[masterUpgradingKey]) > 0
}

func (sm *StatusManager) masterUpgraded() bool {
if len(sm.cm.Data[masterUpgradedKey]) > 0 {
glog.V(4).Infof("Master has successfully upgraded at %s", sm.cm.Data[masterUpgradedKey])
return true
}
return false
return len(sm.cm.Data[masterUpgradedKey]) > 0
}

func (sm *StatusManager) flush() {
sm.f.lock.Lock()
defer sm.f.lock.Unlock()

glog.V(3).Infof("Attempting to flush %v", sm.cm.Data)

// If master is in the process of upgrading, we exit early and turn off the
// ConfigMap informer.
if sm.masterUpgrading() && sm.informerRunning {
sm.stopInformer()
return
}

// Restart ConfigMap informer if it was previously shut down
if sm.masterUpgraded() && !sm.informerRunning {
if !sm.informerRunning && sm.masterUpgraded() {
glog.V(2).Infof("Master has successfully upgraded at %s", sm.cm.Data[masterUpgradedKey])
sm.startInformer()
}

// Loop until we successfully update the config map
for {
updatedCm, err := sm.f.Clientset.Core().ConfigMaps("default").Get(configMapName, metav1.GetOptions{})
if err != nil {
// K8s considers its version of the ConfigMap to be latest, so we must get
// the configmap from k8s first.
updatedCm, err := sm.f.Clientset.Core().ConfigMaps("default").Get(configMapName, metav1.GetOptions{})
// The k8s API returns an empty ConfigMap upon error - we return early in
// order to not overwrite our ConfigMap data.
if err != nil {
// if the k8s master is upgrading, we suppress the error message because
// the error is expected.
if !sm.masterUpgrading() {
glog.Warningf("Error getting ConfigMap: %v", err)
}

if updatedCm.Data == nil {
updatedCm.Data = make(map[string]string)
}
glog.Warningf("Error getting ConfigMap: %v", err)
return
}

// K8s considers its version of the ConfigMap to be latest, so we must get
// the configmap from k8s first.
// We give precedence to the master-upgraded and master-upgrading flags
// set by the external test framework, but otherwise we prioritize
// Ingress statuses set by StatusManager.
for key, value := range sm.cm.Data {
if key != masterUpgradedKey && key != masterUpgradingKey {
updatedCm.Data[key] = value
}
}
sm.cm = updatedCm
sm.cm.Name = configMapName

_, err = sm.f.Clientset.Core().ConfigMaps("default").Update(sm.cm)
if err != nil {
glog.Warningf("Error updating ConfigMap: %v", err)
} else {
// ConfigMap successfully updated
break
if updatedCm.Data == nil {
updatedCm.Data = make(map[string]string)
}

// We give precedence to the master-upgraded and master-upgrading flags
// set by the external test framework, but otherwise we prioritize
// Ingress statuses set by StatusManager.
for key, value := range sm.cm.Data {
if key != masterUpgradedKey && key != masterUpgradingKey {
updatedCm.Data[key] = value
}
}
glog.V(3).Infof("Flushed statuses to ConfigMap")
glog.V(3).Infof("ConfigMap: %+v", sm.cm)
sm.cm = updatedCm
sm.cm.Name = configMapName

_, err = sm.f.Clientset.Core().ConfigMaps("default").Update(sm.cm)
if err != nil {
glog.Warningf("Error updating ConfigMap: %v", err)
} else {
glog.V(3).Infof("Flushed statuses %v to ConfigMap", sm.cm.Data)
}
}

0 comments on commit 3240156

Please sign in to comment.