zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

2022-03-16 k8s的operator-hub中的redis-operator的redis-cluster的组建redis-cluster集群及cluster高可用处理

Redisk8s集群 处理 2022 可用 16 03
2023-09-27 14:25:42 时间

目录

摘要:

时序图:

核心函数:

RedisClusterReconciler:Reconcile中组建cluster集群及高可用部分:

CheckRedisNodeCount

checkRedisCluster

ExecuteRedisClusterCommand

ExecuteRedisReplicationCommand

ExecuteFailoverOperation

executeFailoverCommand


摘要:

记录k8s的operator-hub中的redis-operator的redis-cluster的组建集群及高可用处理

时序图:

核心函数:

RedisClusterReconciler:Reconcile中组建cluster集群及高可用部分:

	if leaderReplicas == 0 {
		reqLogger.Info("Redis leaders Cannot be 0", "Ready.Replicas", strconv.Itoa(int(redisLeaderInfo.Status.ReadyReplicas)), "Expected.Replicas", leaderReplicas)
		return ctrl.Result{RequeueAfter: time.Second * 120}, nil
	}

	if int32(redisLeaderInfo.Status.ReadyReplicas) != leaderReplicas && int32(redisFollowerInfo.Status.ReadyReplicas) != followerReplicas {
		reqLogger.Info("Redis leader and follower nodes are not ready yet", "Ready.Replicas", strconv.Itoa(int(redisLeaderInfo.Status.ReadyReplicas)), "Expected.Replicas", leaderReplicas)
		return ctrl.Result{RequeueAfter: time.Second * 120}, nil
	}
	reqLogger.Info("Creating redis cluster by executing cluster creation commands", "Leaders.Ready", strconv.Itoa(int(redisLeaderInfo.Status.ReadyReplicas)), "Followers.Ready", strconv.Itoa(int(redisFollowerInfo.Status.ReadyReplicas)))
	if k8sutils.CheckRedisNodeCount(instance, "") != totalReplicas {
		leaderCount := k8sutils.CheckRedisNodeCount(instance, "leader")
		if leaderCount != leaderReplicas {
			reqLogger.Info("Not all leader are part of the cluster...", "Leaders.Count", leaderCount, "Instance.Size", leaderReplicas)
			k8sutils.ExecuteRedisClusterCommand(instance)
		} else {
			if followerReplicas > 0 {
				reqLogger.Info("All leader are part of the cluster, adding follower/replicas", "Leaders.Count", leaderCount, "Instance.Size", leaderReplicas, "Follower.Replicas", followerReplicas)
				k8sutils.ExecuteRedisReplicationCommand(instance)
			} else {
				reqLogger.Info("no follower/replicas configured, skipping replication configuration", "Leaders.Count", leaderCount, "Leader.Size", leaderReplicas, "Follower.Replicas", followerReplicas)
			}
		}
	} else {
		reqLogger.Info("Redis leader count is desired")
		if k8sutils.CheckRedisClusterState(instance) >= int(totalReplicas)-1 {
			reqLogger.Info("Redis leader is not desired, executing failover operation")
			k8sutils.ExecuteFailoverOperation(instance)
		}
		return ctrl.Result{RequeueAfter: time.Second * 120}, nil
	}

CheckRedisNodeCount

// CheckRedisNodeCount will check the count of redis nodes
func CheckRedisNodeCount(cr *redisv1beta1.RedisCluster, nodeType string) int32 {
	var redisNodeType string
	logger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)
	clusterNodes := checkRedisCluster(cr)
	count := len(clusterNodes)

	switch nodeType {
	case "leader":
		redisNodeType = "master"
	case "follower":
		redisNodeType = "slave"
	default:
		redisNodeType = nodeType
	}
	if nodeType != "" {
		count = 0
		for _, node := range clusterNodes {
			if strings.Contains(node[2], redisNodeType) {
				count++
			}
		}
		logger.Info("Number of redis nodes are", "Nodes", strconv.Itoa(count), "Type", nodeType)
	} else {
		logger.Info("Total number of redis nodes are", "Nodes", strconv.Itoa(count))
	}
	return int32(count)
}

checkRedisCluster

// checkRedisCluster will check the redis cluster have sufficient nodes or not
func checkRedisCluster(cr *redisv1beta1.RedisCluster) [][]string {
	var client *redis.Client
	logger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)
	client = configureRedisClient(cr, cr.ObjectMeta.Name+"-leader-0")
	cmd := redis.NewStringCmd("cluster", "nodes")
	err := client.Process(cmd)
	if err != nil {
		logger.Error(err, "Redis command failed with this error")
	}

	output, err := cmd.Result()
	if err != nil {
		logger.Error(err, "Redis command failed with this error")
	}
	logger.Info("Redis cluster nodes are listed", "Output", output)

	csvOutput := csv.NewReader(strings.NewReader(output))
	csvOutput.Comma = ' '
	csvOutput.FieldsPerRecord = -1
	csvOutputRecords, err := csvOutput.ReadAll()
	if err != nil {
		logger.Error(err, "Error parsing Node Counts", "output", output)
	}
	return csvOutputRecords
}

ExecuteRedisClusterCommand

// ExecuteRedisClusterCommand will execute redis cluster creation command
func ExecuteRedisClusterCommand(cr *redisv1beta1.RedisCluster) {
	logger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)
	replicas := cr.Spec.GetReplicaCounts("leader")
	cmd := []string{"redis-cli", "--cluster", "create"}
	for podCount := 0; podCount <= int(replicas)-1; podCount++ {
		pod := RedisDetails{
			PodName:   cr.ObjectMeta.Name + "-leader-" + strconv.Itoa(podCount),
			Namespace: cr.Namespace,
		}
		cmd = append(cmd, getRedisServerIP(pod)+":6379")
	}
	cmd = append(cmd, "--cluster-yes")

	if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil {
		pass, err := getRedisPassword(cr.Namespace, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Name, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Key)
		if err != nil {
			logger.Error(err, "Error in getting redis password")
		}
		cmd = append(cmd, "-a")
		cmd = append(cmd, pass)
	}
	cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, cr.ObjectMeta.Name+"-leader-0")...)
	logger.Info("Redis cluster creation command is", "Command", cmd)
	executeCommand(cr, cmd, cr.ObjectMeta.Name+"-leader-0")
}

ExecuteRedisReplicationCommand


// ExecuteRedisReplicationCommand will execute the replication command
func ExecuteRedisReplicationCommand(cr *redisv1beta1.RedisCluster) {
	logger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)
	replicas := cr.Spec.GetReplicaCounts("follower")
	nodes := checkRedisCluster(cr)
	for podCount := 0; podCount <= int(replicas)-1; podCount++ {
		followerPod := RedisDetails{
			PodName:   cr.ObjectMeta.Name + "-follower-" + strconv.Itoa(podCount),
			Namespace: cr.Namespace,
		}
		leaderPod := RedisDetails{
			PodName:   cr.ObjectMeta.Name + "-leader-" + strconv.Itoa(podCount),
			Namespace: cr.Namespace,
		}
		podIP := getRedisServerIP(followerPod)
		if !checkRedisNodePresence(cr, nodes, podIP) {
			logger.Info("Adding node to cluster.", "Node.IP", podIP, "Follower.Pod", followerPod)
			cmd := createRedisReplicationCommand(cr, leaderPod, followerPod)
			executeCommand(cr, cmd, cr.ObjectMeta.Name+"-leader-0")
		} else {
			logger.Info("Skipping Adding node to cluster, already present.", "Follower.Pod", followerPod)
		}
	}
}

ExecuteFailoverOperation

// ExecuteFailoverOperation will execute redis failover operations
func ExecuteFailoverOperation(cr *redisv1beta1.RedisCluster) {
	executeFailoverCommand(cr, "leader")
	executeFailoverCommand(cr, "follower")
}

executeFailoverCommand

// executeFailoverCommand will execute failover command
func executeFailoverCommand(cr *redisv1beta1.RedisCluster, role string) {
	logger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)
	replicas := cr.Spec.GetReplicaCounts(role)
	podName := cr.ObjectMeta.Name + "-" + role + "-"
	for podCount := 0; podCount <= int(replicas)-1; podCount++ {
		logger.Info("Executing redis failover operations", "Redis Node", podName+strconv.Itoa(podCount))
		client := configureRedisClient(cr, podName+strconv.Itoa(podCount))
		cmd := redis.NewStringCmd("cluster", "reset")
		err := client.Process(cmd)
		if err != nil {
			logger.Error(err, "Redis command failed with this error")
			flushcommand := redis.NewStringCmd("flushall")
			err := client.Process(flushcommand)
			if err != nil {
				logger.Error(err, "Redis flush command failed with this error")
			}
		}

		output, err := cmd.Result()
		if err != nil {
			logger.Error(err, "Redis command failed with this error")
		}
		logger.Info("Redis cluster failover executed", "Output", output)
	}
}