Skip to content

Commit

Permalink
Merge pull request #419 from shangjin92/feature/add-role-label
Browse files Browse the repository at this point in the history
feat(redis): Add pod label of redis role, to support Master/Slave model.
  • Loading branch information
ese authored Aug 24, 2022
2 parents 7714614 + f98e33f commit 7e1a068
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 16 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/bin
.bash_history
.vscode
.idea/
14 changes: 14 additions & 0 deletions mocks/service/k8s/Services.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 39 additions & 6 deletions operator/redisfailover/service/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,27 @@ func (r *RedisFailoverChecker) CheckSentinelNumber(rf *redisfailoverv1.RedisFail
return nil
}

func (r *RedisFailoverChecker) setMasterLabelIfNecessary(namespace string, pod corev1.Pod) error {
for labelKey, labelValue := range pod.ObjectMeta.Labels {
if labelKey == redisRoleLabelKey && labelValue == redisRoleLabelMaster {
return nil
}
}
return r.k8sService.UpdatePodLabels(namespace, pod.ObjectMeta.Name, generateRedisMasterRoleLabel())
}

func (r *RedisFailoverChecker) setSlaveLabelIfNecessary(namespace string, pod corev1.Pod) error {
for labelKey, labelValue := range pod.ObjectMeta.Labels {
if labelKey == redisRoleLabelKey && labelValue == redisRoleLabelSlave {
return nil
}
}
return r.k8sService.UpdatePodLabels(namespace, pod.ObjectMeta.Name, generateRedisSlaveRoleLabel())
}

// CheckAllSlavesFromMaster controlls that all slaves have the same master (the real one)
func (r *RedisFailoverChecker) CheckAllSlavesFromMaster(master string, rf *redisfailoverv1.RedisFailover) error {
rips, err := r.GetRedisesIPs(rf)
rps, err := r.k8sService.GetStatefulSetPods(rf.Namespace, GetRedisName(rf))
if err != nil {
return err
}
Expand All @@ -86,13 +104,26 @@ func (r *RedisFailoverChecker) CheckAllSlavesFromMaster(master string, rf *redis
return err
}

for _, rip := range rips {
slave, err := r.redisClient.GetSlaveOf(rip, password)
for _, rp := range rps.Items {
if rp.Status.PodIP == master {
err = r.setMasterLabelIfNecessary(rf.Namespace, rp)
if err != nil {
return err
}
} else {
err = r.setSlaveLabelIfNecessary(rf.Namespace, rp)
if err != nil {
return err
}
}

slave, err := r.redisClient.GetSlaveOf(rp.Status.PodIP, password)
if err != nil {
r.logger.Errorf("Get slave of master failed, maybe this node is not ready, pod ip: %s", rp.Status.PodIP)
return err
}
if slave != "" && slave != master {
return fmt.Errorf("slave %s don't have the master %s, has %s", rip, master, slave)
return fmt.Errorf("slave %s don't have the master %s, has %s", rp.Status.PodIP, master, slave)
}
}
return nil
Expand Down Expand Up @@ -153,7 +184,8 @@ func (r *RedisFailoverChecker) GetMasterIP(rf *redisfailoverv1.RedisFailover) (s
for _, rip := range rips {
master, err := r.redisClient.IsMaster(rip, password)
if err != nil {
return "", err
r.logger.Errorf("Get redis info failed, maybe this node is not ready, pod ip: %s", rip)
continue
}
if master {
masters = append(masters, rip)
Expand Down Expand Up @@ -182,7 +214,8 @@ func (r *RedisFailoverChecker) GetNumberMasters(rf *redisfailoverv1.RedisFailove
for _, rip := range rips {
master, err := r.redisClient.IsMaster(rip, password)
if err != nil {
return nMasters, err
r.logger.Errorf("Get redis info failed, maybe this node is not ready, pod ip: %s", rip)
continue
}
if master {
nMasters++
Expand Down
8 changes: 7 additions & 1 deletion operator/redisfailover/service/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/mock"

"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -155,6 +157,7 @@ func TestCheckAllSlavesFromMasterGetStatefulSetError(t *testing.T) {

ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(nil, errors.New(""))
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil)
mr := &mRedisService.Client{}

checker := rfservice.NewRedisFailoverChecker(ms, mr, log.DummyLogger{})
Expand All @@ -181,6 +184,7 @@ func TestCheckAllSlavesFromMasterGetSlaveOfError(t *testing.T) {

ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil)
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil)
mr := &mRedisService.Client{}
mr.On("GetSlaveOf", "", "").Once().Return("", errors.New(""))

Expand Down Expand Up @@ -208,6 +212,7 @@ func TestCheckAllSlavesFromMasterDifferentMaster(t *testing.T) {

ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil)
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil)
mr := &mRedisService.Client{}
mr.On("GetSlaveOf", "0.0.0.0", "").Once().Return("1.1.1.1", nil)

Expand Down Expand Up @@ -235,6 +240,7 @@ func TestCheckAllSlavesFromMaster(t *testing.T) {

ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil)
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil)
mr := &mRedisService.Client{}
mr.On("GetSlaveOf", "0.0.0.0", "").Once().Return("1.1.1.1", nil)

Expand Down Expand Up @@ -578,7 +584,7 @@ func TestGetNumberMastersIsMasterError(t *testing.T) {
checker := rfservice.NewRedisFailoverChecker(ms, mr, log.DummyLogger{})

_, err := checker.GetNumberMasters(rf)
assert.Error(err)
assert.NoError(err)
}

func TestGetNumberMasters(t *testing.T) {
Expand Down
16 changes: 16 additions & 0 deletions operator/redisfailover/service/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ func generateSelectorLabels(component, name string) map[string]string {
}
}

func generateRedisDefaultRoleLabel() map[string]string {
return generateRedisSlaveRoleLabel()
}

func generateRedisMasterRoleLabel() map[string]string {
return map[string]string{
redisRoleLabelKey: redisRoleLabelMaster,
}
}

func generateRedisSlaveRoleLabel() map[string]string {
return map[string]string{
redisRoleLabelKey: redisRoleLabelSlave,
}
}

// EnsureSentinelService makes sure the sentinel service exists
func (r *RedisFailoverKubeClient) EnsureSentinelService(rf *redisfailoverv1.RedisFailover, labels map[string]string, ownerRefs []metav1.OwnerReference) error {
svc := generateSentinelService(rf, labels, ownerRefs)
Expand Down
6 changes: 6 additions & 0 deletions operator/redisfailover/service/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,9 @@ const (
appLabel = "redis-failover"
hostnameTopologyKey = "kubernetes.io/hostname"
)

const (
redisRoleLabelKey = "redisfailovers-role"
redisRoleLabelMaster = "master"
redisRoleLabelSlave = "slave"
)
2 changes: 2 additions & 0 deletions operator/redisfailover/service/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ func generateRedisStatefulSet(rf *redisfailoverv1.RedisFailover, labels map[stri
redisCommand := getRedisCommand(rf)
selectorLabels := generateSelectorLabels(redisRoleName, rf.Name)
labels = util.MergeLabels(labels, selectorLabels)
labels = util.MergeLabels(labels, generateRedisDefaultRoleLabel())

volumeMounts := getRedisVolumeMounts(rf)
volumes := getRedisVolumes(rf)
terminationGracePeriodSeconds := getTerminationGracePeriodSeconds(rf)
Expand Down
71 changes: 64 additions & 7 deletions operator/redisfailover/service/heal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/spotahome/redis-operator/log"
"github.com/spotahome/redis-operator/service/k8s"
"github.com/spotahome/redis-operator/service/redis"
v1 "k8s.io/api/core/v1"
)

// RedisFailoverHeal defines the interface able to fix the problems on the redis failovers
Expand Down Expand Up @@ -41,13 +42,45 @@ func NewRedisFailoverHealer(k8sService k8s.Services, redisClient redis.Client, l
}
}

func (r *RedisFailoverHealer) setMasterLabelIfNecessary(namespace string, pod v1.Pod) error {
for labelKey, labelValue := range pod.ObjectMeta.Labels {
if labelKey == redisRoleLabelKey && labelValue == redisRoleLabelMaster {
return nil
}
}
return r.k8sService.UpdatePodLabels(namespace, pod.ObjectMeta.Name, generateRedisMasterRoleLabel())
}

func (r *RedisFailoverHealer) setSlaveLabelIfNecessary(namespace string, pod v1.Pod) error {
for labelKey, labelValue := range pod.ObjectMeta.Labels {
if labelKey == redisRoleLabelKey && labelValue == redisRoleLabelSlave {
return nil
}
}
return r.k8sService.UpdatePodLabels(namespace, pod.ObjectMeta.Name, generateRedisSlaveRoleLabel())
}

func (r *RedisFailoverHealer) MakeMaster(ip string, rf *redisfailoverv1.RedisFailover) error {
password, err := k8s.GetRedisPassword(r.k8sService, rf)
if err != nil {
return err
}

return r.redisClient.MakeMaster(ip, password)
err = r.redisClient.MakeMaster(ip, password)
if err != nil {
return err
}

rps, err := r.k8sService.GetStatefulSetPods(rf.Namespace, GetRedisName(rf))
if err != nil {
return err
}
for _, rp := range rps.Items {
if rp.Status.PodIP == ip {
return r.setMasterLabelIfNecessary(rf.Namespace, rp)
}
}
return nil
}

// SetOldestAsMaster puts all redis to the same master, choosen by order of appearance
Expand All @@ -73,14 +106,26 @@ func (r *RedisFailoverHealer) SetOldestAsMaster(rf *redisfailoverv1.RedisFailove
newMasterIP := ""
for _, pod := range ssp.Items {
if newMasterIP == "" {
newMasterIP = pod.Status.PodIP
r.logger.Debugf("New master is %s with ip %s", pod.Name, newMasterIP)
if err := r.redisClient.MakeMaster(newMasterIP, password); err != nil {
r.logger.Infof("New master is %s with ip %s", pod.Name, pod.Status.PodIP)
if err := r.redisClient.MakeMaster(pod.Status.PodIP, password); err != nil {
r.logger.Errorf("Make new master failed, master ip: %s, error: %v", pod.Status.PodIP, err)
continue
}

err = r.setMasterLabelIfNecessary(rf.Namespace, pod)
if err != nil {
return err
}

newMasterIP = pod.Status.PodIP
} else {
r.logger.Debugf("Making pod %s slave of %s", pod.Name, newMasterIP)
r.logger.Infof("Making pod %s slave of %s", pod.Name, newMasterIP)
if err := r.redisClient.MakeSlaveOf(pod.Status.PodIP, newMasterIP, password); err != nil {
r.logger.Errorf("Make slave failed, slave pod ip: %s, master ip: %s, error: %v", pod.Status.PodIP, newMasterIP, err)
}

err = r.setSlaveLabelIfNecessary(rf.Namespace, pod)
if err != nil {
return err
}
}
Expand All @@ -102,13 +147,25 @@ func (r *RedisFailoverHealer) SetMasterOnAll(masterIP string, rf *redisfailoverv

for _, pod := range ssp.Items {
if pod.Status.PodIP == masterIP {
r.logger.Debugf("Ensure pod %s is master", pod.Name)
r.logger.Infof("Ensure pod %s is master", pod.Name)
if err := r.redisClient.MakeMaster(masterIP, password); err != nil {
r.logger.Errorf("Make master failed, master ip: %s, error: %v", masterIP, err)
return err
}

err = r.setMasterLabelIfNecessary(rf.Namespace, pod)
if err != nil {
return err
}
} else {
r.logger.Debugf("Making pod %s slave of %s", pod.Name, masterIP)
r.logger.Infof("Making pod %s slave of %s", pod.Name, masterIP)
if err := r.redisClient.MakeSlaveOf(pod.Status.PodIP, masterIP, password); err != nil {
r.logger.Errorf("Make slave failed, slave ip: %s, master ip: %s, error: %v", pod.Status.PodIP, masterIP, err)
return err
}

err = r.setSlaveLabelIfNecessary(rf.Namespace, pod)
if err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit 7e1a068

Please sign in to comment.