diff --git a/pkg/neg/readiness/reflector.go b/pkg/neg/readiness/reflector.go index faf9b74401..c232d3d625 100644 --- a/pkg/neg/readiness/reflector.go +++ b/pkg/neg/readiness/reflector.go @@ -17,11 +17,13 @@ limitations under the License. package readiness import ( + "fmt" + "reflect" "sync" "time" - "fmt" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -33,13 +35,21 @@ import ( negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/ingress-gce/pkg/neg/types/shared" "k8s.io/klog" - "reflect" ) const ( - maxRetries = 15 - negReadyReason = "LoadBalancerNegReady" + maxRetries = 15 + // negReadyReason is the pod condition reason when pod becomes Healthy in NEG or pod no longer belongs to any NEG + negReadyReason = "LoadBalancerNegReady" + // negReadyTimedOutReason is the pod condition reason when timeout is reached but pod is still not healthy in NEG + negReadyTimedOutReason = "LoadBalancerNegTimeout" + // negNotReadyReason is the pod condition reason when pod is not healthy in NEG negNotReadyReason = "LoadBalancerNegNotReady" + // unreadyTimeout is the timeout for health status feedback for pod readiness. If load balancer health + // check is still not showing as Healthy for long than the time out since the pod is created. Skip wating and mark + // the pod as load balancer ready. + // This is a fail-safe in case that should be longer than any reasonable amount of time for the healthy infrastructure catch up. + unreadyTimeout = 10 * time.Minute ) // readinessReflector implements the Reflector interface @@ -47,6 +57,7 @@ type readinessReflector struct { // podUpdateLock ensures that at any time there is only one podUpdateLock sync.Mutex client kubernetes.Interface + clock clock.Clock // pollerLock ensures there is only poll pollerLock sync.Mutex @@ -71,6 +82,7 @@ func NewReadinessReflector(cc *context.ControllerContext, lookup NegLookup) Refl reflector := &readinessReflector{ client: cc.KubeClient, podLister: cc.PodInformer.GetIndexer(), + clock: clock.RealClock{}, lookup: lookup, eventBroadcaster: broadcaster, eventRecorder: recorder, @@ -151,32 +163,44 @@ func (r *readinessReflector) syncPod(key string, neg string) (err error) { } klog.V(4).Infof("Syncing Pod %q", key) - expectedCondition := v1.PodCondition{Type: shared.NegReadinessGate} - var message, reason string + expectedCondition := r.getExpectedNegCondition(pod, neg) + return r.ensurePodNegCondition(pod, expectedCondition) +} +// getExpectedCondition returns the expected NEG readiness condition for the given pod +func (r *readinessReflector) getExpectedNegCondition(pod *v1.Pod, neg string) v1.PodCondition { + expectedCondition := v1.PodCondition{Type: shared.NegReadinessGate} if len(neg) > 0 { expectedCondition.Status = v1.ConditionTrue - reason = negReadyReason - message = fmt.Sprintf("Pod has become Healthy in NEG %q. Marking condition %q to True.", neg, shared.NegReadinessGate) - } else { - negs := r.lookup.ReadinessGateEnabledNegs(pod.Namespace, pod.Labels) - // mark pod as ready if it belongs to no NEGs - if len(negs) == 0 { - expectedCondition.Status = v1.ConditionTrue - reason = negReadyReason - message = fmt.Sprintf("Pod does not belong to any NEG. Marking condition %q to True.", shared.NegReadinessGate) - } else { - // do not patch condition status in this case to prevent race condition: - // 1. poller marks a pod ready - // 2. syncPod gets call and does not retrieve the updated pod spec with true neg readiness condition - // 3. syncPod patches the neg readiness condition to be false - reason = negNotReadyReason - message = fmt.Sprintf("Waiting for pod to become healthy in at least one of the NEG(s): %v", negs) - } - } - expectedCondition.Reason = reason - expectedCondition.Message = message - return r.ensurePodNegCondition(pod, expectedCondition) + expectedCondition.Reason = negReadyReason + expectedCondition.Message = fmt.Sprintf("Pod has become Healthy in NEG %q. Marking condition %q to True.", neg, shared.NegReadinessGate) + return expectedCondition + } + + negs := r.lookup.ReadinessGateEnabledNegs(pod.Namespace, pod.Labels) + // mark pod as ready if it belongs to no NEGs + if len(negs) == 0 { + expectedCondition.Status = v1.ConditionTrue + expectedCondition.Reason = negReadyReason + expectedCondition.Message = fmt.Sprintf("Pod does not belong to any NEG. Marking condition %q to True.", shared.NegReadinessGate) + return expectedCondition + } + + // check if the pod has been waiting for the endpoint to show up as Healthy in NEG for too long + if r.clock.Now().After(pod.CreationTimestamp.Add(unreadyTimeout)) { + expectedCondition.Status = v1.ConditionTrue + expectedCondition.Reason = negReadyTimedOutReason + expectedCondition.Message = fmt.Sprintf("Timeout waiting for pod to become healthy in at least one of the NEG(s): %v. Marking condition %q to True.", negs, shared.NegReadinessGate) + return expectedCondition + } + + // do not patch condition status in this case to prevent race condition: + // 1. poller marks a pod ready + // 2. syncPod gets call and does not retrieve the updated pod spec with true neg readiness condition + // 3. syncPod patches the neg readiness condition to be false + expectedCondition.Reason = negNotReadyReason + expectedCondition.Message = fmt.Sprintf("Waiting for pod to become healthy in at least one of the NEG(s): %v", negs) + return expectedCondition } // SyncPod filter the pods that needed to be processed and put it into queue diff --git a/pkg/neg/readiness/reflector_test.go b/pkg/neg/readiness/reflector_test.go index 27bda78580..10b01a800d 100644 --- a/pkg/neg/readiness/reflector_test.go +++ b/pkg/neg/readiness/reflector_test.go @@ -21,6 +21,7 @@ import ( "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/client-go/kubernetes/fake" "k8s.io/ingress-gce/pkg/context" negtypes "k8s.io/ingress-gce/pkg/neg/types" @@ -77,6 +78,9 @@ func TestSyncPod(t *testing.T) { podLister := testReadinessReflector.podLister testlookUp := testReadinessReflector.lookup.(*fakeLookUp) podName := "pod1" + fakeClock := clock.NewFakeClock(time.Now()) + testReadinessReflector.clock = fakeClock + now := metav1.NewTime(fakeClock.Now()).Rfc3339Copy() for _, tc := range []struct { desc string @@ -147,9 +151,10 @@ func TestSyncPod(t *testing.T) { }, }, { - desc: "need to update pod and there is Negs associated", + desc: "need to update pod: there is NEGs associated but pod is not healthy", mutateState: func() { pod := generatePod(testNamespace, podName, true, false, false) + pod.CreationTimestamp = now podLister.Update(pod) client.CoreV1().Pods(testNamespace).Update(pod) testlookUp.readinessGateEnabledNegs = []string{"neg1", "neg2"} @@ -159,8 +164,9 @@ func TestSyncPod(t *testing.T) { expectExists: true, expectPod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Namespace: testNamespace, - Name: podName, + Namespace: testNamespace, + Name: podName, + CreationTimestamp: now, }, Spec: v1.PodSpec{ ReadinessGates: []v1.PodReadinessGate{ @@ -179,7 +185,7 @@ func TestSyncPod(t *testing.T) { }, }, { - desc: "need to update pod and there is Negs associated", + desc: "need to update pod: pod is healthy in a NEG", mutateState: func() { pod := generatePod(testNamespace, podName, true, false, false) podLister.Update(pod) @@ -211,6 +217,42 @@ func TestSyncPod(t *testing.T) { }, }, }, + { + desc: "timeout waiting for endpoint to become healthy in NEGs", + mutateState: func() { + pod := generatePod(testNamespace, podName, true, false, false) + pod.CreationTimestamp = now + podLister.Update(pod) + client.CoreV1().Pods(testNamespace).Update(pod) + testlookUp.readinessGateEnabledNegs = []string{"neg1", "neg2"} + fakeClock.Step(unreadyTimeout) + }, + inputKey: keyFunc(testNamespace, podName), + inputNeg: "", + expectExists: true, + expectPod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNamespace, + Name: podName, + CreationTimestamp: now, + }, + Spec: v1.PodSpec{ + ReadinessGates: []v1.PodReadinessGate{ + {ConditionType: shared.NegReadinessGate}, + }, + }, + Status: v1.PodStatus{ + Conditions: []v1.PodCondition{ + { + Type: shared.NegReadinessGate, + Reason: negReadyTimedOutReason, + Status: v1.ConditionTrue, + Message: fmt.Sprintf("Timeout waiting for pod to become healthy in at least one of the NEG(s): %v. Marking condition %q to True.", []string{"neg1", "neg2"}, shared.NegReadinessGate), + }, + }, + }, + }, + }, } { tc.mutateState() err := testReadinessReflector.syncPod(tc.inputKey, tc.inputNeg) @@ -223,6 +265,8 @@ func TestSyncPod(t *testing.T) { if err != nil { t.Errorf("For test case %q, expect err to be nil, but got %v", tc.desc, err) } + // ignore creation timestamp for comparison + pod.CreationTimestamp = tc.expectPod.CreationTimestamp if !reflect.DeepEqual(pod, tc.expectPod) { t.Errorf("For test case %q, expect pod to be %v, but got %v", tc.desc, tc.expectPod, pod) }