Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DEVOPS-635] Refactor waiting for resources to be ready #10

Merged
merged 1 commit into from
Dec 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 34 additions & 85 deletions pkg/failover/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,23 +340,7 @@ func (r *RedisFailoverKubeClient) CreateBootstrapPod(rf *RedisFailover) error {
return err
}

t := r.clock.NewTicker(loopInterval)
for range t.C {
logger.Debug("Waiting for pod to be ready")
ready := false
pod, _ = r.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
for _, condition := range pod.Status.Conditions {

if condition.Type == "Ready" && condition.Status == v1.ConditionTrue {
ready = true
break
}
}
if ready {
t.Stop()
break
}
}
r.waitForPod(rf.Metadata.Name, rf.Metadata.Namespace, logger)

return nil
}
Expand Down Expand Up @@ -401,19 +385,7 @@ func (r *RedisFailoverKubeClient) CreateSentinelService(rf *RedisFailover) error
return err
}

t := r.clock.NewTicker(loopInterval)
for range t.C {
logger.Debug("Waiting for service to find bootstrap pod")
endpoints, _ := r.Client.CoreV1().Endpoints(namespace).Get(name, metav1.GetOptions{})
addresses := 0
for _, subset := range endpoints.Subsets {
addresses += len(subset.Addresses)
}
if addresses > 0 {
t.Stop()
break
}
}
r.waitForService(name, namespace, logger)

return nil
}
Expand Down Expand Up @@ -543,15 +515,7 @@ func (r *RedisFailoverKubeClient) CreateSentinelDeployment(rf *RedisFailover) er
return err
}

t := r.clock.NewTicker(loopInterval)
for range t.C {
logger.Debug("Waiting for Sentinel deployment to be fully operative")
deployment, _ := r.Client.AppsV1beta1().Deployments(namespace).Get(name, metav1.GetOptions{})
if deployment.Status.ReadyReplicas == spec.Sentinel.Replicas {
t.Stop()
break
}
}
r.waitForDeployment(name, namespace, spec.Sentinel.Replicas, logger)

logger.Debug("Creating Sentinel PodDisruptionBudget...")
if err := r.createPodDisruptionBudget(rf, sentinelName, sentinelRoleName); err != nil {
Expand Down Expand Up @@ -729,15 +693,7 @@ func (r *RedisFailoverKubeClient) CreateRedisStatefulset(rf *RedisFailover) erro
return err
}

t := r.clock.NewTicker(loopInterval)
for range t.C {
logger.Debug("Waiting for Redis statefulset to be fully operative")
statefulset, _ := r.Client.AppsV1beta1().StatefulSets(namespace).Get(name, metav1.GetOptions{})
if statefulset.Status.ReadyReplicas == spec.Redis.Replicas {
t.Stop()
break
}
}
r.waitForStatefulset(name, namespace, spec.Redis.Replicas, logger)

logger.Debug("Creating Redis PodDisruptionBudget...")
if err := r.createPodDisruptionBudget(rf, redisName, redisRoleName); err != nil {
Expand Down Expand Up @@ -809,7 +765,9 @@ func (r *RedisFailoverKubeClient) createPodDisruptionBudget(rf *RedisFailover, n

// UpdateSentinelDeployment updates the spec of the existing sentinel deployment
func (r *RedisFailoverKubeClient) UpdateSentinelDeployment(rf *RedisFailover) error {
logger := r.logger.WithField(logNameField, rf.Metadata.Name).WithField(logNamespaceField, rf.Metadata.Namespace)
name := r.GetSentinelName(rf)
namespace := rf.Metadata.Namespace
logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace)

quorum := rf.GetQuorum()
replicas := rf.Spec.Sentinel.Replicas
Expand All @@ -834,26 +792,20 @@ func (r *RedisFailoverKubeClient) UpdateSentinelDeployment(rf *RedisFailover) er
oldSD.Spec.Template.Spec.Containers[0].Image = getRedisImage(rf)
oldSD.Spec.Template.Spec.Containers[0].Resources = getSentinelResources(rf.Spec)

if _, err := r.Client.AppsV1beta1().Deployments(rf.Metadata.Namespace).Update(oldSD); err != nil {
if _, err := r.Client.AppsV1beta1().Deployments(namespace).Update(oldSD); err != nil {
return err
}

t := r.clock.NewTicker(loopInterval)
for range t.C {
logger.Debug("Waiting for Sentinel deployment to be updated")
deployment, _ := r.GetSentinelDeployment(rf)
if deployment.Status.ReadyReplicas == replicas && deployment.Status.UpdatedReplicas == replicas {
t.Stop()
break
}
}
r.waitForDeployment(name, namespace, replicas, logger)

return nil
}

// UpdateRedisStatefulset updates the spec of the existing redis statefulset
func (r *RedisFailoverKubeClient) UpdateRedisStatefulset(rf *RedisFailover) error {
logger := r.logger.WithField(logNameField, rf.Metadata.Name).WithField(logNamespaceField, rf.Metadata.Namespace)
name := r.GetRedisName(rf)
namespace := rf.Metadata.Namespace
logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace)

replicas := rf.Spec.Redis.Replicas

Expand All @@ -866,19 +818,11 @@ func (r *RedisFailoverKubeClient) UpdateRedisStatefulset(rf *RedisFailover) erro
oldSS.Spec.Template.Spec.Containers[0].Resources = getRedisResources(rf.Spec)
oldSS.Spec.Template.Spec.Containers[0].Image = getRedisImage(rf)

if _, err := r.Client.AppsV1beta1().StatefulSets(rf.Metadata.Namespace).Update(oldSS); err != nil {
if _, err := r.Client.AppsV1beta1().StatefulSets(namespace).Update(oldSS); err != nil {
return err
}

t := r.clock.NewTicker(loopInterval)
for range t.C {
logger.Debug("Waiting for Redis statefulset to be updated")
statefulset, _ := r.GetRedisStatefulset(rf)
if statefulset.Status.ReadyReplicas == replicas && statefulset.Status.UpdatedReplicas == replicas {
t.Stop()
break
}
}
r.waitForStatefulset(name, namespace, replicas, logger)

return nil
}
Expand All @@ -888,20 +832,14 @@ func (r *RedisFailoverKubeClient) DeleteBootstrapPod(rf *RedisFailover) error {
name := r.GetBootstrapName(rf)
namespace := rf.Metadata.Namespace

logger := r.logger.WithField(logNameField, rf.Metadata.Name).WithField(logNamespaceField, rf.Metadata.Namespace)
logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace)
err := r.Client.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{})
if err != nil {
return err
}
t := r.clock.NewTicker(loopInterval)
for range t.C {
logger.Debug("Waiting for pod to terminate")
pod, _ := r.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
if len(pod.Name) == 0 {
t.Stop()
break
}
}

r.waitForPodDeletion(name, namespace, logger)

return nil
}

Expand All @@ -913,13 +851,15 @@ func (r *RedisFailoverKubeClient) DeleteRedisStatefulset(rf *RedisFailover) erro
if err := r.Client.AppsV1beta1().StatefulSets(namespace).Delete(name, &metav1.DeleteOptions{PropagationPolicy: &propagation}); err != nil {
return err
}
logger := r.logger.WithField(logNameField, rf.Metadata.Name).WithField(logNamespaceField, rf.Metadata.Namespace)
logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace)
logger.Debug("Deleting Redis PodDisruptionBudget...")
if err := r.deletePodDisruptionBudget(rf, redisName); err != nil {
return err
}
logger.Debug("Redis PodDisruptionBudget deleted!")
// TODO: Wait for statefulset to really delete

r.waitForStatefulsetDeletion(name, namespace, logger)

return nil
}

Expand All @@ -931,36 +871,45 @@ func (r *RedisFailoverKubeClient) DeleteSentinelDeployment(rf *RedisFailover) er
if err := r.Client.AppsV1beta1().Deployments(namespace).Delete(name, &metav1.DeleteOptions{PropagationPolicy: &propagation}); err != nil {
return err
}
logger := r.logger.WithField(logNameField, rf.Metadata.Name).WithField(logNamespaceField, rf.Metadata.Namespace)
logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace)
logger.Debug("Deleting Sentinel PodDisruptionBudget...")
if err := r.deletePodDisruptionBudget(rf, sentinelName); err != nil {
return err
}
logger.Debug("Sentinel PodDisruptionBudget deleted!")
// TODO: Wait for deployment to really delete

r.waitForDeploymentDeletion(name, namespace, logger)

return nil
}

// DeleteSentinelService deletes a sentinel service
func (r *RedisFailoverKubeClient) DeleteSentinelService(rf *RedisFailover) error {
name := r.GetSentinelName(rf)
namespace := rf.Metadata.Namespace
logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace)
propagation := metav1.DeletePropagationForeground
if err := r.Client.CoreV1().Services(namespace).Delete(name, &metav1.DeleteOptions{PropagationPolicy: &propagation}); err != nil {
return err
}
// TODO: Wait for service to really delete

r.waitForServiceDeletion(name, namespace, logger)

return nil
}

// DeleteRedisService deletes redis service
func (r *RedisFailoverKubeClient) DeleteRedisService(rf *RedisFailover) error {
name := r.GetRedisName(rf)
namespace := rf.Metadata.Namespace
logger := r.logger.WithField(logNameField, name).WithField(logNamespaceField, namespace)
propagation := metav1.DeletePropagationForeground
if err := r.Client.CoreV1().Services(namespace).Delete(name, &metav1.DeleteOptions{PropagationPolicy: &propagation}); err != nil {
return err
}

r.waitForServiceDeletion(name, namespace, logger)

return nil
}

Expand Down
142 changes: 142 additions & 0 deletions pkg/failover/waiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package failover

import (
"github.com/spotahome/redis-operator/pkg/log"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/api/v1"
)

func (r *RedisFailoverKubeClient) waitForPod(name string, namespace string, logger log.Logger) {
t := r.clock.NewTicker(loopInterval)
defer t.Stop()

for range t.C {
logger.Debug("Waiting for pod to be ready")
pod, _ := r.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
for _, condition := range pod.Status.Conditions {
if condition.Type == "Ready" && condition.Status == v1.ConditionTrue {
return
}
}
}
}

func (r *RedisFailoverKubeClient) waitForService(name string, namespace string, logger log.Logger) {
t := r.clock.NewTicker(loopInterval)
defer t.Stop()

for range t.C {
logger.Debug("Waiting for service to find bootstrap pod")
endpoints, _ := r.Client.CoreV1().Endpoints(namespace).Get(name, metav1.GetOptions{})
addresses := 0
for _, subset := range endpoints.Subsets {
addresses += len(subset.Addresses)
}
if addresses > 0 {
return
}
}
}

func (r *RedisFailoverKubeClient) waitForDeployment(name string, namespace string, replicas int32, logger log.Logger) {
t := r.clock.NewTicker(loopInterval)
defer t.Stop()

for range t.C {
logger.Debug("Waiting for Sentinel deployment to be fully operative")
deployment, _ := r.Client.AppsV1beta1().Deployments(namespace).Get(name, metav1.GetOptions{})
if deployment.Status.ReadyReplicas == replicas {
return
}
}
}

func (r *RedisFailoverKubeClient) waitForStatefulset(name string, namespace string, replicas int32, logger log.Logger) {
t := r.clock.NewTicker(loopInterval)
defer t.Stop()

for range t.C {
logger.Debug("Waiting for Redis statefulset to be fully operative")
statefulset, _ := r.Client.AppsV1beta1().StatefulSets(namespace).Get(name, metav1.GetOptions{})
if statefulset.Status.ReadyReplicas == replicas {
return
}
}
}

func (r *RedisFailoverKubeClient) waitForPodDeletion(name string, namespace string, logger log.Logger) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All this deletion methods could be reduced to one codebase (no code duplication) and pass an interface taht gets the items.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we've discussed, It's more complicated to do an interface because runtime.Object does not implement the Items function (for the list). We'd have to do a manual object copy and it does not worth it.

t := r.clock.NewTicker(loopInterval)
defer t.Stop()

for range t.C {
logger.Debug("Waiting for pod to terminate")
podList, _ := r.Client.CoreV1().Pods(namespace).List(metav1.ListOptions{})
found := false
for _, pod := range podList.Items {
if pod.Name == name {
found = true
}
}
if !found {
return
}
}
}

func (r *RedisFailoverKubeClient) waitForStatefulsetDeletion(name string, namespace string, logger log.Logger) {
t := r.clock.NewTicker(loopInterval)
defer t.Stop()

for range t.C {
logger.Debug("Waiting for statefulset to terminate")
statefulsetList, _ := r.Client.AppsV1beta1().StatefulSets(namespace).List(metav1.ListOptions{})
found := false
for _, statefulset := range statefulsetList.Items {
if statefulset.Name == name {
found = true
}
}
if !found {
return
}
}
}

func (r *RedisFailoverKubeClient) waitForDeploymentDeletion(name string, namespace string, logger log.Logger) {
t := r.clock.NewTicker(loopInterval)
defer t.Stop()

for range t.C {
logger.Debug("Waiting for deployment to terminate")
deploymentList, _ := r.Client.Apps().Deployments(namespace).List(metav1.ListOptions{})
found := false
for _, deployment := range deploymentList.Items {
if deployment.Name == name {
found = true
}
}
if !found {
return
}
}
}

func (r *RedisFailoverKubeClient) waitForServiceDeletion(name string, namespace string, logger log.Logger) {
t := r.clock.NewTicker(loopInterval)
defer t.Stop()

for range t.C {
logger.Debug("Waiting for service to disappear")
serviceList, _ := r.Client.Core().Services(namespace).List(metav1.ListOptions{})
found := false
for _, service := range serviceList.Items {
if service.Name == name {
found = true
}
}
if !found {
return
}
}
}