K8S/Kubernetes 详解实现pod数量的负载均衡

详解实现pod数量的负载均衡

1.架构图在 Kubernetes (K8s) 中,可以通过多种方式动态创建 Pods。以下是几种常用的方法:1. 使用 kubectl run 命令这是创建 Pods 的一种简单方式。你可以使用 kubectl run 命令来快速创建一个 Pod。以下是一个示例:登录后复制kubectl run my-pod --i...

1.架构图

详解实现pod数量的负载均衡_Containers

在 Kubernetes (K8s) 中,可以通过多种方式动态创建 Pods。以下是几种常用的方法:

1. 使用 kubectl run 命令

这是创建 Pods 的一种简单方式。你可以使用 kubectl run 命令来快速创建一个 Pod。以下是一个示例:

kubectl run my-pod --image=nginx --restart=Never
  • 1.

这条命令会创建一个名为 my-pod 的 Pod,使用 nginx 镜像,并且不会重启(再执行这个命令会创建一个新的 Pod)。

2. 使用 Pod 的 YAML 文件

你可以编写一个 YAML 格式的配置文件,定义 Pod 的规格,并使用 kubectl apply 命令创建 Pod。例如,下面是一个示例 YAML 文件 pod.yaml

apiVersion: v1
kind: Pod
metadata:
  name: my-pod
spec:
  containers:
  - name: my-container
    image: nginx
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

然后运行以下命令来创建 Pod:

kubectl apply -f pod.yaml
  • 1.
3. 使用 Deployment

通常不建议直接创建单个 Pod,因为 Pods 是无状态、易破坏的。相反,可以使用 Deployment 来管理 Pods。以下是一个示例 YAML 文件 deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-deployment
spec:
  replicas: 3
  selector:
    matchLabels:
      app: my-app
  template:
    metadata:
      labels:
        app: my-app
    spec:
      containers:
      - name: my-container
        image: nginx
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.

然后运行以下命令创建 Deployment:

kubectl apply -f deployment.yaml
  • 1.

通过上述 Deployment 文件,Kubernetes 将会创建和管理三个 nginx Pods。

4. 使用 Auto-scaling (动态扩展)

可以结合 Horizontal Pod Autoscaler(HPA)动态地扩展和缩减 Pods 数量。首先,你需要定义一个 Deployment,然后创建 HPA 进行自动扩展:

定义 Deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-deployment
spec:
  replicas: 1
  selector:
    matchLabels:
      app: my-app
  template:
    metadata:
      labels:
        app: my-app
    spec:
      containers:
      - name: my-container
        image: nginx
        resources:
          requests:
            cpu: "100m"
          limits:
            cpu: "200m"
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.

定义 HPA:

apiVersion: autoscaling/v1
kind: HorizontalPodAutoscaler
metadata:
  name: my-deployment-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: my-deployment
  minReplicas: 1
  maxReplicas: 5
  targetCPUUtilizationPercentage: 50
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

通过以下命令应用 HPA:

kubectl apply -f hpa.yaml
  • 1.
总结

在 Kubernetes 中,可以通过命令行工具 kubectl 或者配置文件的方式动态创建 Pods。实际生产环境下,推荐使用 Deployment 来管理 Pod,结合 Horizontal Pod Autoscaler 动态调整 Pod 的数量,以应对负载变化。

2.代码逻辑

定时任务(20s)

defer recover

1. 获取信息

- 获取数据库中所有u3server

- 获取需要部署的信息

2. 遍历并修正数据库u3server

-2.1 将没有心跳的server标记成待删除

- 没有心跳:hbTime超过1min && createTime超过2min

3. 遍历部署信息,根据数据库u3server信息

- 3.1 根据serviceName和version过滤当前u3

- 3.2 计算当前可用u3数量和目标u3数量(目标数量 = max(totalUser/avgUser + 1, totalMeeting/avgMeeting + 1)

当前可用u3 数量需要加上刚创建的但是不可用的(没有心跳,创建时间小于两分钟)

- 缺少u3时

- 将enabled的设置成true(status=1的除外)

- u3数量不够时,拉起u3服务,并且添加数据库的u3server

- u3多余时

- 将enabled设置为false

- disable策略

- 优先disable人数少的

- 如果人数都为0,优先createTime最大的

- enable时相反

- 3.3 将可以release(enable 为false userCount为0,0<releaseTime<now)的server标记待删除

4. 遍历数据库的u3server(没有心跳的已经被剔除)

- serviceName+version不在部署信息中的enable为true标记不可用设置enabled=false,

- serviceName+version不在部署信息中enabled=false,且可以释放的,标记成待删除

5. 遍历待删除servers

- 删除k8s pod,回调bbs,删除数据库记录

3.数据库设计

详解实现pod数量的负载均衡_Containers_02

4.代码实现

4.1 client.go

创建k8s的client

package k8s

import (
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
)

var clientSet *kubernetes.Clientset

func Init() {
	// creates the in-cluster config
	config, err := rest.InClusterConfig()
	if err != nil {
		panic(err)
	}
	// creates the clientSet
	cs, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err)
	}
	clientSet = cs
}

func GetClientSet() *kubernetes.Clientset {
	return clientSet
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
4.2 pod.go

调用k8s的api进行pod的管理,创建、查询、释放等

package u3

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/pkg/errors"
	tv1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"mgr/config"
	"mgr/dao"
	"mgr/k8s"
	"os"
	"path/filepath"
	"strings"
	"support/collection/_set"
	"time"
)

func getPodContent() ([]byte, error) {
	var fileName string
	env := config.Config.EnvStr
	switch env {
	case "dev":
		fileName = "pod_dev.json"
	case "test":
		fileName = "pod_test.json"
	case "www":
		fileName = "pod_www.json"
	default:
		return nil, errors.New("unknown env: " + env)
	}
	filePath := filepath.Join("./resource", fileName)
	return os.ReadFile(filePath)
}

func GetUpimeDeploymentByK8s() (map[string]*UpimeDeployment, error) {
	var upimeDeploymentMap = make(map[string]*UpimeDeployment)
	namespace := config.Config.Namespace
	label := "manager=u3mgr"
	// label := "manager=confhpa"
	pods, err := k8s.GetClientSet().CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
		LabelSelector: fmt.Sprintf(label),
	})
	if err != nil {
		return upimeDeploymentMap, err
	}
	for _, pod := range pods.Items {
		var name = pod.Labels["app"]
		var version = pod.Labels["version"]
		var key = fmt.Sprintf("%s-%s", name, version)
		upimeDeploymentMap[key] = &UpimeDeployment{
			name:        name, // u3 | u3rk
			serviceName: name, // 这里serviceName就取name
			version:     version,
			imageName:   pod.Labels["imageName"],
			pods:        make(map[string]*UpimePod),
			expireTime:  time.Now().Add(time.Minute * -1), // 已经忘了为什么-1min了
			isServing:   true,
		}
	}
	return upimeDeploymentMap, err
}

func startPod(server *dao.TblU3Server, deploy *dao.TblDeploy) error {
	content, err := getPodContent()
	if err != nil {
		return err
	}
	var pod = tv1.Pod{}
	err = json.Unmarshal(content, &pod)
	if err != nil {
		return err
	}
	rollId := server.RollId
	pod.Name = server.Hostname
	pod.Labels["app"] = deploy.ServiceName
	pod.Labels["version"] = deploy.ImageVersion
	pod.Labels["imageName"] = deploy.ImageName
	pod.Labels["ROLLID"] = rollId

	// spec
	pod.Spec.Hostname = pod.Name
	pod.Spec.Containers[0].Image = "docker.plaso.cn/" + deploy.ImageName + ":" + deploy.ImageVersion
	pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, tv1.EnvVar{Name: "ROLLID", Value: rollId})
	pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, tv1.EnvVar{Name: "SERVICE_NAME", Value: deploy.ServiceName})
	pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, tv1.EnvVar{Name: "VERSION", Value: deploy.ImageVersion})
	if deploy.LaunchCmd != "" {
		pod.Spec.Containers[0].Command = []string{deploy.LaunchCmd}
	}
	ns := config.Config.Namespace
	_, err = k8s.GetClientSet().CoreV1().Pods(ns).Create(context.TODO(), &pod, metav1.CreateOptions{})
	return err
}
func getPodName(name string, version string, index interface{}) string {
	return fmt.Sprintf("%s%s-%v", name, strings.Replace(version, ".", "", -1), index)
}

func getNextPodName(nameSet *_set.Set[string], name string, version string, startIndex int) string {
	for {
		var hostName = getPodName(name, version, startIndex)
		if !nameSet.Has(hostName) {
			nameSet.Add(hostName)
			return hostName
		} else {
			startIndex++
		}
	}
}

func getRollId(rollIdMap map[string]string, sv string) string {
	randomString := "abcdefghijklmnopqrstuvwxyz"
	v, ok := rollIdMap[sv]
	if ok {
		return v
	}
	for i := 0; i < len(randomString); i++ {
		rollId := string(randomString[i])
		if !isRollIdExist(rollIdMap, rollId) {
			rollIdMap[sv] = rollId
			return rollId
		}
	}
	return "U"
}

func isRollIdExist(rollIdMap map[string]string, rollId string) bool {
	for _, v := range rollIdMap {
		if v == rollId {
			return true
		}
	}
	return false
}

func releasePod(server *dao.TblU3Server) error {
	if server.CanDelete() {
		ns := config.Config.Namespace
		return k8s.GetClientSet().CoreV1().Pods(ns).Delete(context.TODO(), server.Hostname, metav1.DeleteOptions{})
	}
	return nil
}

func releasePodDirect(hostname string) error {
	ns := config.Config.Namespace
	return k8s.GetClientSet().CoreV1().Pods(ns).Delete(context.TODO(), hostname, metav1.DeleteOptions{})
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
4.3 cron.go

定时器扫描配置文件,计算当前需要创建或者删除的pod数量,便于维护pod数量

package cronjob

import (
	"github.com/robfig/cron/v3"
	"mgr/handle/u3"
	"support/logger"
)

func Init() {
	c := cron.New()
	checkU3(c)
	c.Start()
}

func checkU3(c *cron.Cron) {

	log := logger.LogPrefix("[checkU3]")
	_, err := c.AddFunc("@every 20s", func() {
		log.Info("checkU3 Begin")
		u3.Check(log)
		log.Info("checkU3 end")
	})
	if err != nil {
		log.Error("checkU3 failed as: %s", err)
	}
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
4.4 check.go

这里主要是维护pod数量的逻辑

package u3

import (
	"context"
	"fmt"
	"mgr/config"
	"mgr/dao"
	"regexp"
	"runtime/debug"
	"sort"
	"support/collection/_set"
	"support/http_util"
	"support/logger"
	"support/math"
	db2 "support/web/db"
)

var nameSet = _set.New[string]()

func Check(log logger.ILog) {
	defer func() {
		if err := recover(); err != nil {
			logger.Error("Check panic as: %s\n%s", err, debug.Stack())
		}
	}()
	db := db2.Db(context.Background())
	u3servers, err := dao.GetU3Server(db)
	if err != nil {
		log.Error("GetU3Server failed as err:%v", err)
		return
	}
	log.Debug(" size:%d ,u3servers:%s", len(u3servers), u3servers)

	nameSet = getHostNameSet(u3servers)
	rollIdMap := getRollIdMap(u3servers)
	// 待删除servers
	serversForDelete := make([]*dao.TblU3Server, 0)
	serversForDelete, u3servers = dealNoHeartServers(u3servers)
	// 查询部署信息
	deploys, err := dao.QueryDeploy(db)
	if err != nil {
		log.Error("QueryDeploy failed as err:%v", err)
		return
	}
	deploys = filterValidDeploys(deploys)
	log.Debug("size:%d ,deploys:%s", len(deploys), deploys)
	svSet := _set.New[string]()
	releaseSet := _set.New[int]()
	for _, deploy := range deploys {
		var serviceName = deploy.ServiceName
		var version = deploy.ImageVersion
		svSet.Add(getSv(serviceName, version))
		filter := func(server *dao.TblU3Server) bool {
			return serviceName == server.ServiceName && version == server.Version
		}
		currServers := GetServersFilter(u3servers, filter)
		targetCount := getTargetCount(currServers, deploy.AvgUser, deploy.AvgMeeting,
			deploy.MinCount, deploy.BakCount)
		currServers, justCreatingCount := getServersFilterJustCreating(currServers)
		log.Debug("serviceName:%s,justCreatingCount:%d", serviceName, justCreatingCount)
		// 获取当前enable的server 以及刚创建的server数目
		expectCount := len(GetServersFilter(currServers, func(server *dao.TblU3Server) bool {
			return server.Enabled
		})) + justCreatingCount
		if targetCount > expectCount {
			// enable pod
			count := targetCount - expectCount
			dealEnablePod(log, currServers, count, deploy, nameSet, rollIdMap)
		} else if targetCount < expectCount {
			// disable pod
			count := expectCount - targetCount
			dealDisablePod(log, currServers, count)
		}
		// 待release
		dealCanRelease(currServers, releaseSet)
		// 待删除
		serversForDelete = append(serversForDelete, dealCanDelete(currServers)...)
	}
	// service、version信息不对也需要Release
	serversForDelete = append(serversForDelete, dealSvAndRelease(log, u3servers, svSet, releaseSet)...)

	// 释放pod
	dealDeletePod(log, serversForDelete)
}

func dealNoHeartServers(servers []*dao.TblU3Server) ([]*dao.TblU3Server, []*dao.TblU3Server) {
	serversForDelete := make([]*dao.TblU3Server, 0)
	serversLeft := make([]*dao.TblU3Server, 0)
	for _, server := range servers {
		// 没有心跳标记删除
		if server.IsNoHeart() {
			serversForDelete = append(serversForDelete, server)
		} else {
			serversLeft = append(serversLeft, server)
		}
	}
	return serversForDelete, serversLeft
}

func GetServersFilter(servers []*dao.TblU3Server, filter func(server *dao.TblU3Server) bool) []*dao.TblU3Server {
	s := make([]*dao.TblU3Server, 0)
	for _, server := range servers {
		if filter(server) {
			s = append(s, server)
		}
	}
	return s
}

func getTargetCount(servers []*dao.TblU3Server, avgUser, avgMeeting, minCount, bakCount int) int {
	var totalUser, totalMeeting int
	for _, server := range servers {
		totalUser += server.UserCount
		totalMeeting += server.NodeCount
	}

	n1 := totalUser/avgUser + 1 + bakCount
	n2 := totalMeeting/avgMeeting + 1 + bakCount
	ret := math.Max(n1, n2, minCount)
	ret = math.Min(ret, config.Config.MaxU3Count)
	return ret
}

func getServersFilterJustCreating(servers []*dao.TblU3Server) ([]*dao.TblU3Server, int) {
	s := make([]*dao.TblU3Server, 0)
	for _, server := range servers {
		if !server.IsJustCreating() {
			s = append(s, server)
		}
	}
	justCreatingCount := len(servers) - len(s)
	return s, justCreatingCount
}

func dealEnablePod(log logger.ILog, servers []*dao.TblU3Server, count int, deploy *dao.TblDeploy,
	nameSet *_set.Set[string], rollIdMap map[string]string) {
	// 先试试将disable的改为enable,不够则拉起新的pod
	disableServers := GetServersFilter(servers, func(server *dao.TblU3Server) bool {
		return !server.Enabled
	})
	sort.Slice(disableServers, func(i, j int) bool {
		return disableServers[i].CreateTime < disableServers[j].CreateTime
	})
	db := db2.Db(context.Background())
	if len(disableServers) < count {
		entities := make([]*dao.TblU3Server, 0)
		// 拉起新的pod
		startCount := count - len(disableServers)
		for i := 0; i < startCount; i++ {
			hostname := getNextPodName(nameSet, deploy.ServiceName, deploy.ImageVersion, i)
			serviceName := deploy.ServiceName
			version := deploy.ImageVersion
			rollId := getRollId(rollIdMap, getSv(serviceName, version))
			server := dao.BuildU3Server(hostname, rollId, serviceName, version)
			entities = append(entities, server)
			if err := startPod(server, deploy); err != nil {
				log.Error("startPod failed as err:%v", err)
				continue
			}
		}
		if err := dao.UpsertU3servers(db, entities); err != nil {
			log.Error("UpsertU3servers failed as err:%v", err)
		}
	}
	num := math.Min(len(disableServers), count)
	ids := make([]int, num)
	for i := 0; i < num; i++ {
		// 更新数据库
		disableServers[i].Enabled = true
		modifyServers(servers, disableServers[i])
		ids[i] = disableServers[i].Id
	}
	err := dao.UpdateServersEnabled(db, ids)
	if err != nil {
		log.Error("UpdateServersEnabled failed as err:%v", err)
	}
	return
}
func dealDisablePod(log logger.ILog, servers []*dao.TblU3Server, count int) {

	enableServers := GetServersFilter(servers, func(server *dao.TblU3Server) bool {
		return server.Enabled
	})
	sort.Slice(enableServers, func(i, j int) bool {
		if enableServers[i].UserCount == enableServers[j].UserCount {
			return enableServers[i].CreateTime > enableServers[j].CreateTime
		} else {
			return enableServers[i].UserCount < enableServers[j].UserCount
		}
	})
	ids := make([]int, count)
	for i := 0; i < count; i++ {
		// 更新数据库
		enableServers[i].Enabled = false
		modifyServers(servers, enableServers[i])
		ids[i] = enableServers[i].Id
	}
	db := db2.Db(context.Background())
	err := dao.UpdateServersDisabled(db, ids)
	if err != nil {
		log.Error("UpdateServersDisabled failed as err:%v", err)
	}
}

func dealDeletePod(log logger.ILog, serversForDelete []*dao.TblU3Server) {

	sd := make([]*dao.TblU3Server, 0)
	// 删除pod
	for _, server := range serversForDelete {
		log.Debug("releasePod start server:%s", server)
		if err := releasePod(server); err != nil {
			log.Error("releasePod failed as err:%v", err)
		}
		// 回调bbs 失败暂不重试
		if err := rpcBbsServerDelete(server.Hostname); err != nil {
			log.Error("rpcBbsServerDelete failed as err:%v", err)
		}
		sd = append(sd, server)
		log.Debug("releasePod success end server:%s", server)
	}
	db := db2.Db(context.Background())
	if err := dao.DeleteServers(db, sd); err != nil {
		log.Error("DeleteServers failed as err:%v", err)
	}
}

func getHostNameSet(servers []*dao.TblU3Server) *_set.Set[string] {
	for _, s := range servers {
		nameSet.Add(s.Hostname)
	}
	return nameSet
}

// key:serviceName+version
func getRollIdMap(servers []*dao.TblU3Server) map[string]string {
	rollIdMap := make(map[string]string, 0)
	for _, s := range servers {
		sv := getSv(s.ServiceName, s.Version)
		rollIdMap[sv] = s.RollId
	}
	return rollIdMap
}

func dealSvAndRelease(log logger.ILog, servers []*dao.TblU3Server, svSet *_set.Set[string], rsSet *_set.Set[int]) []*dao.TblU3Server {
	ids := make([]int, 0)
	serversForDelete := make([]*dao.TblU3Server, 0)
	for _, server := range servers {
		sv := getSv(server.ServiceName, server.Version)
		if !svSet.Has(sv) {
			// 有其他的server可用 当前server才设置不可用
			if server.Enabled && hasOtherEnabledServer(servers, server.ServiceName) {
				ids = append(ids, server.Id)
				server.Enabled = false
			}
			if server.IsReadyRelease() {
				rsSet.Add(server.Id)
			}
			if server.CanDelete() {
				serversForDelete = append(serversForDelete, server)
			}
		}
	}
	db := db2.Db(context.Background())
	if err := dao.UpdateServersDisabled(db, ids); err != nil {
		log.Error("UpdateServersDisabled failed as err:%v", err)
	}
	if err := dao.UpdateServersRelease(db, rsSet.Slice()); err != nil {
		log.Error("UpdateServersRelease failed as err:%v", err)
	}
	return serversForDelete
}

func dealCanDelete(servers []*dao.TblU3Server) []*dao.TblU3Server {
	serversForDelete := make([]*dao.TblU3Server, 0)
	for _, s := range servers {
		if s.CanDelete() {
			serversForDelete = append(serversForDelete, s)
		}
	}
	return serversForDelete
}

func dealCanRelease(servers []*dao.TblU3Server, rsSet *_set.Set[int]) {
	for _, server := range servers {
		if server.IsReadyRelease() {
			rsSet.Add(server.Id)
		}
	}
}

func modifyServers(servers []*dao.TblU3Server, server *dao.TblU3Server) {
	for _, s := range servers {
		if s.Id == server.Id {
			s = server
		}
	}
}

func getSv(serviceName, version string) string {
	return fmt.Sprintf("%s_%s", serviceName, version)
}

func rpcBbsServerDelete(hostname string) error {
	bbsUrl := config.Config.BbsUrl + "/upime3/serverDelete"
	body := map[string]any{
		"hostname": hostname,
	}
	if err := http_util.PlasoPost(bbsUrl, body, nil); err != nil {
		return err
	}
	return nil
}

func getServiceNameEnableCount(servers []*dao.TblU3Server, serviceName string) int {
	count := 0
	for _, server := range servers {
		if server.ServiceName == serviceName && server.Enabled {
			count++
		}
	}
	return count
}

func hasOtherEnabledServer(servers []*dao.TblU3Server, serviceName string) bool {
	if getServiceNameEnableCount(servers, serviceName) > 1 {
		return true
	}
	return false
}

func DeleteU3Server(hostname string) {
	log := logger.LogPrefix(fmt.Sprintf("DeleteU3Server [hostname: %s]", hostname))
	db := db2.Db(context.Background())
	if err := releasePodDirect(hostname); err != nil {
		log.Error("release pod failed, err: %s", err)
	}
	log.Debug("release pod success")
	if err := dao.DeleteServerByHostname(db, hostname); err != nil {
		log.Error("DeleteServerByHostname failed as err: %s", err)
	}
	log.Debug("delete db record success")
	if err := rpcBbsServerDelete(hostname); err != nil {
		log.Error("rpcBbsServerDelete failed as err: %s", err)
	}
	log.Debug("rpc u3balance success")
}

func filterValidDeploys(deploys []*dao.TblDeploy) []*dao.TblDeploy {
	r := regexp.MustCompile(`^[0-9a-z.-]+$`)
	res := make([]*dao.TblDeploy, 0, len(deploys))
	for _, deploy := range deploys {
		name := deploy.ServiceName + deploy.ImageVersion
		if r.MatchString(name) {
			res = append(res, deploy)
		} else {
			logger.Error("invalid serviceName or version, name: %s", name)
		}
	}
	return res
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
  • 149.
  • 150.
  • 151.
  • 152.
  • 153.
  • 154.
  • 155.
  • 156.
  • 157.
  • 158.
  • 159.
  • 160.
  • 161.
  • 162.
  • 163.
  • 164.
  • 165.
  • 166.
  • 167.
  • 168.
  • 169.
  • 170.
  • 171.
  • 172.
  • 173.
  • 174.
  • 175.
  • 176.
  • 177.
  • 178.
  • 179.
  • 180.
  • 181.
  • 182.
  • 183.
  • 184.
  • 185.
  • 186.
  • 187.
  • 188.
  • 189.
  • 190.
  • 191.
  • 192.
  • 193.
  • 194.
  • 195.
  • 196.
  • 197.
  • 198.
  • 199.
  • 200.
  • 201.
  • 202.
  • 203.
  • 204.
  • 205.
  • 206.
  • 207.
  • 208.
  • 209.
  • 210.
  • 211.
  • 212.
  • 213.
  • 214.
  • 215.
  • 216.
  • 217.
  • 218.
  • 219.
  • 220.
  • 221.
  • 222.
  • 223.
  • 224.
  • 225.
  • 226.
  • 227.
  • 228.
  • 229.
  • 230.
  • 231.
  • 232.
  • 233.
  • 234.
  • 235.
  • 236.
  • 237.
  • 238.
  • 239.
  • 240.
  • 241.
  • 242.
  • 243.
  • 244.
  • 245.
  • 246.
  • 247.
  • 248.
  • 249.
  • 250.
  • 251.
  • 252.
  • 253.
  • 254.
  • 255.
  • 256.
  • 257.
  • 258.
  • 259.
  • 260.
  • 261.
  • 262.
  • 263.
  • 264.
  • 265.
  • 266.
  • 267.
  • 268.
  • 269.
  • 270.
  • 271.
  • 272.
  • 273.
  • 274.
  • 275.
  • 276.
  • 277.
  • 278.
  • 279.
  • 280.
  • 281.
  • 282.
  • 283.
  • 284.
  • 285.
  • 286.
  • 287.
  • 288.
  • 289.
  • 290.
  • 291.
  • 292.
  • 293.
  • 294.
  • 295.
  • 296.
  • 297.
  • 298.
  • 299.
  • 300.
  • 301.
  • 302.
  • 303.
  • 304.
  • 305.
  • 306.
  • 307.
  • 308.
  • 309.
  • 310.
  • 311.
  • 312.
  • 313.
  • 314.
  • 315.
  • 316.
  • 317.
  • 318.
  • 319.
  • 320.
  • 321.
  • 322.
  • 323.
  • 324.
  • 325.
  • 326.
  • 327.
  • 328.
  • 329.
  • 330.
  • 331.
  • 332.
  • 333.
  • 334.
  • 335.
  • 336.
  • 337.
  • 338.
  • 339.
  • 340.
  • 341.
  • 342.
  • 343.
  • 344.
  • 345.
  • 346.
  • 347.
  • 348.
  • 349.
  • 350.
  • 351.
  • 352.
  • 353.
  • 354.
  • 355.
  • 356.
  • 357.
  • 358.
  • 359.
  • 360.
4.4 server.go

数据库ddl,操作server表

package dao

import (
	"gorm.io/gorm"
	"support/util"
	"time"
)

type TblU3Server struct {
	Id          int    `gorm:"column:id"`
	RollId      string `gorm:"column:rollId"`
	Version     string `gorm:"column:version"`
	HbTime      int64  `gorm:"column:hbTime"`
	CreateTime  int64  `gorm:"column:createTime"`
	Enabled     bool   `gorm:"column:enabled"`
	Hostname    string `gorm:"column:hostname"`
	ServiceName string `gorm:"column:serviceName"`
	ReleaseTime int64  `gorm:"column:releaseTime"`
	NodeCount   int    `gorm:"column:nodeCount"`
	UserCount   int    `gorm:"column:userCount"`
}

func (us *TblU3Server) String() string {
	return util.ConvertToJsonStr(us)
}

const tblU3Server = "tbl_u3_server"

func GetU3Server(db *gorm.DB) ([]*TblU3Server, error) {
	var res []*TblU3Server
	err := db.Table(tblU3Server).Find(&res).Error
	return res, err
}

func BuildU3Server(hostname, rollId, serviceName, version string) *TblU3Server {
	return &TblU3Server{
		Hostname:    hostname,
		RollId:      rollId,
		CreateTime:  util.NowMs(),
		ServiceName: serviceName,
		Version:     version,
	}
}

func UpsertU3servers(db *gorm.DB, entities []*TblU3Server) error {
	if len(entities) == 0 {
		return nil
	}
	return db.Table(tblU3Server).Save(entities).Error
}

func (us *TblU3Server) SetForDelete() {
	us.Enabled = false
	us.UserCount = 0
	us.ReleaseTime = util.NowMs()
}

func (us *TblU3Server) IsNoHeart() bool {
	now := util.NowMs()
	f1 := now-us.HbTime > time.Minute.Milliseconds()
	f2 := now-us.CreateTime > 2*time.Minute.Milliseconds()
	return f1 && f2
}

func (us *TblU3Server) CanDelete() bool {
	now := util.NowMs()
	return us.Enabled == false && us.UserCount == 0 && us.ReleaseTime > 0 && us.ReleaseTime < now
}

func (us *TblU3Server) IsJustCreating() bool {
	now := util.NowMs()
	return us.HbTime == 0 && now-us.CreateTime < 2*time.Minute.Milliseconds()
}

func (us *TblU3Server) IsReadyRelease() bool {
	return us.Enabled == false && us.UserCount == 0 && us.ReleaseTime == 0
}

func UpdateServersEnabled(db *gorm.DB, ids []int) error {
	if len(ids) == 0 {
		return nil
	}
	return db.Table(tblU3Server).
		Where("id in ?", ids).
		Updates(map[string]any{"enabled": true, "releaseTime": 0}).Error
}

func UpdateServersDisabled(db *gorm.DB, ids []int) error {
	if len(ids) == 0 {
		return nil
	}
	return db.Table(tblU3Server).
		Where("id in ?", ids).
		Updates(map[string]any{"enabled": false}).Error
}

func DeleteServers(db *gorm.DB, entities []*TblU3Server) error {
	if len(entities) == 0 {
		return nil
	}
	return db.Table(tblU3Server).Delete(&entities).Error
}

func DeleteServerByHostname(db *gorm.DB, hostname string) error {
	return db.Table(tblU3Server).
		Where("hostname = ?", hostname).
		Delete(nil).Error
}

func UpdateServersRelease(db *gorm.DB, ids []int) error {
	if len(ids) == 0 {
		return nil
	}
	return db.Table(tblU3Server).Where("id in ?", ids).
		Update("releaseTime", time.Now().Add(time.Minute).UnixMilli()).Error
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
4.5 deploy.go

部署表,用来控制需要多少pod数量

package dao

import (
	"gorm.io/gorm"
	"support/util"
)

const tblDeploy = "tbl_deploy"

//const tblDeploy = "tbl_deploy_copy1"

type TblDeploy struct {
	MinCount     int    `gorm:"column:minCount"`
	BakCount     int    `gorm:"column:bakCount"`
	AvgUser      int    `gorm:"column:avgUser"`
	AvgMeeting   int    `gorm:"column:avgMeeting"`
	Name         string `gorm:"column:name"`
	ImageVersion string `gorm:"column:imageVersion"`
	ServiceName  string `gorm:"column:serviceName"`
	ImageName    string `gorm:"column:imageName"`
	LaunchCmd    string `gorm:"column:launchCmd"`
}

func (us *TblDeploy) String() string {
	return util.ConvertToJsonStr(us)
}
func QueryDeploy(db *gorm.DB) ([]*TblDeploy, error) {
	var deployInfos []*TblDeploy
	err := db.Table(tblDeploy).Find(&deployInfos).Error
	return deployInfos, err
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.

5.代码解读

详解实现pod数量的负载均衡_数据库_03

解决pod重名问题

详解实现pod数量的负载均衡_json_04

用正则过滤不符合规范的pod名字

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐

  • 浏览量 866
  • 收藏 0
  • 0

所有评论(0)

查看更多评论 
已为社区贡献2条内容