1.架构图
在 Kubernetes (K8s) 中,可以通过多种方式动态创建 Pods。以下是几种常用的方法:
1. 使用 kubectl run
命令
这是创建 Pods 的一种简单方式。你可以使用 kubectl run
命令来快速创建一个 Pod。以下是一个示例:
这条命令会创建一个名为 my-pod
的 Pod,使用 nginx
镜像,并且不会重启(再执行这个命令会创建一个新的 Pod)。
2. 使用 Pod 的 YAML 文件
你可以编写一个 YAML 格式的配置文件,定义 Pod 的规格,并使用 kubectl apply
命令创建 Pod。例如,下面是一个示例 YAML 文件 pod.yaml
:
然后运行以下命令来创建 Pod:
3. 使用 Deployment
通常不建议直接创建单个 Pod,因为 Pods 是无状态、易破坏的。相反,可以使用 Deployment 来管理 Pods。以下是一个示例 YAML 文件 deployment.yaml
:
然后运行以下命令创建 Deployment:
通过上述 Deployment 文件,Kubernetes 将会创建和管理三个 nginx
Pods。
4. 使用 Auto-scaling (动态扩展)
可以结合 Horizontal Pod Autoscaler(HPA)动态地扩展和缩减 Pods 数量。首先,你需要定义一个 Deployment,然后创建 HPA 进行自动扩展:
定义 Deployment:
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-deployment
spec:
replicas: 1
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
containers:
- name: my-container
image: nginx
resources:
requests:
cpu: "100m"
limits:
cpu: "200m"
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
定义 HPA:
通过以下命令应用 HPA:
总结
在 Kubernetes 中,可以通过命令行工具 kubectl
或者配置文件的方式动态创建 Pods。实际生产环境下,推荐使用 Deployment 来管理 Pod,结合 Horizontal Pod Autoscaler 动态调整 Pod 的数量,以应对负载变化。
2.代码逻辑
定时任务(20s)
defer recover
1. 获取信息
- 获取数据库中所有u3server
- 获取需要部署的信息
2. 遍历并修正数据库u3server
-2.1 将没有心跳的server标记成待删除
- 没有心跳:hbTime超过1min && createTime超过2min
3. 遍历部署信息,根据数据库u3server信息
- 3.1 根据serviceName和version过滤当前u3
- 3.2 计算当前可用u3数量和目标u3数量(目标数量 = max(totalUser/avgUser + 1, totalMeeting/avgMeeting + 1)
当前可用u3 数量需要加上刚创建的但是不可用的(没有心跳,创建时间小于两分钟)
- 缺少u3时
- 将enabled的设置成true(status=1的除外)
- u3数量不够时,拉起u3服务,并且添加数据库的u3server
- u3多余时
- 将enabled设置为false
- disable策略
- 优先disable人数少的
- 如果人数都为0,优先createTime最大的
- enable时相反
- 3.3 将可以release(enable 为false userCount为0,0<releaseTime<now)的server标记待删除
4. 遍历数据库的u3server(没有心跳的已经被剔除)
- serviceName+version不在部署信息中的enable为true标记不可用设置enabled=false,
- serviceName+version不在部署信息中enabled=false,且可以释放的,标记成待删除
5. 遍历待删除servers
- 删除k8s pod,回调bbs,删除数据库记录
3.数据库设计
4.代码实现
4.1 client.go
创建k8s的client
package k8s
import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
var clientSet *kubernetes.Clientset
func Init() {
// creates the in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
panic(err)
}
// creates the clientSet
cs, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
clientSet = cs
}
func GetClientSet() *kubernetes.Clientset {
return clientSet
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
4.2 pod.go
调用k8s的api进行pod的管理,创建、查询、释放等
package u3
import (
"context"
"encoding/json"
"fmt"
"github.com/pkg/errors"
tv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"mgr/config"
"mgr/dao"
"mgr/k8s"
"os"
"path/filepath"
"strings"
"support/collection/_set"
"time"
)
func getPodContent() ([]byte, error) {
var fileName string
env := config.Config.EnvStr
switch env {
case "dev":
fileName = "pod_dev.json"
case "test":
fileName = "pod_test.json"
case "www":
fileName = "pod_www.json"
default:
return nil, errors.New("unknown env: " + env)
}
filePath := filepath.Join("./resource", fileName)
return os.ReadFile(filePath)
}
func GetUpimeDeploymentByK8s() (map[string]*UpimeDeployment, error) {
var upimeDeploymentMap = make(map[string]*UpimeDeployment)
namespace := config.Config.Namespace
label := "manager=u3mgr"
// label := "manager=confhpa"
pods, err := k8s.GetClientSet().CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: fmt.Sprintf(label),
})
if err != nil {
return upimeDeploymentMap, err
}
for _, pod := range pods.Items {
var name = pod.Labels["app"]
var version = pod.Labels["version"]
var key = fmt.Sprintf("%s-%s", name, version)
upimeDeploymentMap[key] = &UpimeDeployment{
name: name, // u3 | u3rk
serviceName: name, // 这里serviceName就取name
version: version,
imageName: pod.Labels["imageName"],
pods: make(map[string]*UpimePod),
expireTime: time.Now().Add(time.Minute * -1), // 已经忘了为什么-1min了
isServing: true,
}
}
return upimeDeploymentMap, err
}
func startPod(server *dao.TblU3Server, deploy *dao.TblDeploy) error {
content, err := getPodContent()
if err != nil {
return err
}
var pod = tv1.Pod{}
err = json.Unmarshal(content, &pod)
if err != nil {
return err
}
rollId := server.RollId
pod.Name = server.Hostname
pod.Labels["app"] = deploy.ServiceName
pod.Labels["version"] = deploy.ImageVersion
pod.Labels["imageName"] = deploy.ImageName
pod.Labels["ROLLID"] = rollId
// spec
pod.Spec.Hostname = pod.Name
pod.Spec.Containers[0].Image = "docker.plaso.cn/" + deploy.ImageName + ":" + deploy.ImageVersion
pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, tv1.EnvVar{Name: "ROLLID", Value: rollId})
pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, tv1.EnvVar{Name: "SERVICE_NAME", Value: deploy.ServiceName})
pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, tv1.EnvVar{Name: "VERSION", Value: deploy.ImageVersion})
if deploy.LaunchCmd != "" {
pod.Spec.Containers[0].Command = []string{deploy.LaunchCmd}
}
ns := config.Config.Namespace
_, err = k8s.GetClientSet().CoreV1().Pods(ns).Create(context.TODO(), &pod, metav1.CreateOptions{})
return err
}
func getPodName(name string, version string, index interface{}) string {
return fmt.Sprintf("%s%s-%v", name, strings.Replace(version, ".", "", -1), index)
}
func getNextPodName(nameSet *_set.Set[string], name string, version string, startIndex int) string {
for {
var hostName = getPodName(name, version, startIndex)
if !nameSet.Has(hostName) {
nameSet.Add(hostName)
return hostName
} else {
startIndex++
}
}
}
func getRollId(rollIdMap map[string]string, sv string) string {
randomString := "abcdefghijklmnopqrstuvwxyz"
v, ok := rollIdMap[sv]
if ok {
return v
}
for i := 0; i < len(randomString); i++ {
rollId := string(randomString[i])
if !isRollIdExist(rollIdMap, rollId) {
rollIdMap[sv] = rollId
return rollId
}
}
return "U"
}
func isRollIdExist(rollIdMap map[string]string, rollId string) bool {
for _, v := range rollIdMap {
if v == rollId {
return true
}
}
return false
}
func releasePod(server *dao.TblU3Server) error {
if server.CanDelete() {
ns := config.Config.Namespace
return k8s.GetClientSet().CoreV1().Pods(ns).Delete(context.TODO(), server.Hostname, metav1.DeleteOptions{})
}
return nil
}
func releasePodDirect(hostname string) error {
ns := config.Config.Namespace
return k8s.GetClientSet().CoreV1().Pods(ns).Delete(context.TODO(), hostname, metav1.DeleteOptions{})
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
- 117.
- 118.
- 119.
- 120.
- 121.
- 122.
- 123.
- 124.
- 125.
- 126.
- 127.
- 128.
- 129.
- 130.
- 131.
- 132.
- 133.
- 134.
- 135.
- 136.
- 137.
- 138.
- 139.
- 140.
- 141.
- 142.
- 143.
- 144.
- 145.
- 146.
- 147.
4.3 cron.go
定时器扫描配置文件,计算当前需要创建或者删除的pod数量,便于维护pod数量
package cronjob
import (
"github.com/robfig/cron/v3"
"mgr/handle/u3"
"support/logger"
)
func Init() {
c := cron.New()
checkU3(c)
c.Start()
}
func checkU3(c *cron.Cron) {
log := logger.LogPrefix("[checkU3]")
_, err := c.AddFunc("@every 20s", func() {
log.Info("checkU3 Begin")
u3.Check(log)
log.Info("checkU3 end")
})
if err != nil {
log.Error("checkU3 failed as: %s", err)
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
4.4 check.go
这里主要是维护pod数量的逻辑
package u3
import (
"context"
"fmt"
"mgr/config"
"mgr/dao"
"regexp"
"runtime/debug"
"sort"
"support/collection/_set"
"support/http_util"
"support/logger"
"support/math"
db2 "support/web/db"
)
var nameSet = _set.New[string]()
func Check(log logger.ILog) {
defer func() {
if err := recover(); err != nil {
logger.Error("Check panic as: %s\n%s", err, debug.Stack())
}
}()
db := db2.Db(context.Background())
u3servers, err := dao.GetU3Server(db)
if err != nil {
log.Error("GetU3Server failed as err:%v", err)
return
}
log.Debug(" size:%d ,u3servers:%s", len(u3servers), u3servers)
nameSet = getHostNameSet(u3servers)
rollIdMap := getRollIdMap(u3servers)
// 待删除servers
serversForDelete := make([]*dao.TblU3Server, 0)
serversForDelete, u3servers = dealNoHeartServers(u3servers)
// 查询部署信息
deploys, err := dao.QueryDeploy(db)
if err != nil {
log.Error("QueryDeploy failed as err:%v", err)
return
}
deploys = filterValidDeploys(deploys)
log.Debug("size:%d ,deploys:%s", len(deploys), deploys)
svSet := _set.New[string]()
releaseSet := _set.New[int]()
for _, deploy := range deploys {
var serviceName = deploy.ServiceName
var version = deploy.ImageVersion
svSet.Add(getSv(serviceName, version))
filter := func(server *dao.TblU3Server) bool {
return serviceName == server.ServiceName && version == server.Version
}
currServers := GetServersFilter(u3servers, filter)
targetCount := getTargetCount(currServers, deploy.AvgUser, deploy.AvgMeeting,
deploy.MinCount, deploy.BakCount)
currServers, justCreatingCount := getServersFilterJustCreating(currServers)
log.Debug("serviceName:%s,justCreatingCount:%d", serviceName, justCreatingCount)
// 获取当前enable的server 以及刚创建的server数目
expectCount := len(GetServersFilter(currServers, func(server *dao.TblU3Server) bool {
return server.Enabled
})) + justCreatingCount
if targetCount > expectCount {
// enable pod
count := targetCount - expectCount
dealEnablePod(log, currServers, count, deploy, nameSet, rollIdMap)
} else if targetCount < expectCount {
// disable pod
count := expectCount - targetCount
dealDisablePod(log, currServers, count)
}
// 待release
dealCanRelease(currServers, releaseSet)
// 待删除
serversForDelete = append(serversForDelete, dealCanDelete(currServers)...)
}
// service、version信息不对也需要Release
serversForDelete = append(serversForDelete, dealSvAndRelease(log, u3servers, svSet, releaseSet)...)
// 释放pod
dealDeletePod(log, serversForDelete)
}
func dealNoHeartServers(servers []*dao.TblU3Server) ([]*dao.TblU3Server, []*dao.TblU3Server) {
serversForDelete := make([]*dao.TblU3Server, 0)
serversLeft := make([]*dao.TblU3Server, 0)
for _, server := range servers {
// 没有心跳标记删除
if server.IsNoHeart() {
serversForDelete = append(serversForDelete, server)
} else {
serversLeft = append(serversLeft, server)
}
}
return serversForDelete, serversLeft
}
func GetServersFilter(servers []*dao.TblU3Server, filter func(server *dao.TblU3Server) bool) []*dao.TblU3Server {
s := make([]*dao.TblU3Server, 0)
for _, server := range servers {
if filter(server) {
s = append(s, server)
}
}
return s
}
func getTargetCount(servers []*dao.TblU3Server, avgUser, avgMeeting, minCount, bakCount int) int {
var totalUser, totalMeeting int
for _, server := range servers {
totalUser += server.UserCount
totalMeeting += server.NodeCount
}
n1 := totalUser/avgUser + 1 + bakCount
n2 := totalMeeting/avgMeeting + 1 + bakCount
ret := math.Max(n1, n2, minCount)
ret = math.Min(ret, config.Config.MaxU3Count)
return ret
}
func getServersFilterJustCreating(servers []*dao.TblU3Server) ([]*dao.TblU3Server, int) {
s := make([]*dao.TblU3Server, 0)
for _, server := range servers {
if !server.IsJustCreating() {
s = append(s, server)
}
}
justCreatingCount := len(servers) - len(s)
return s, justCreatingCount
}
func dealEnablePod(log logger.ILog, servers []*dao.TblU3Server, count int, deploy *dao.TblDeploy,
nameSet *_set.Set[string], rollIdMap map[string]string) {
// 先试试将disable的改为enable,不够则拉起新的pod
disableServers := GetServersFilter(servers, func(server *dao.TblU3Server) bool {
return !server.Enabled
})
sort.Slice(disableServers, func(i, j int) bool {
return disableServers[i].CreateTime < disableServers[j].CreateTime
})
db := db2.Db(context.Background())
if len(disableServers) < count {
entities := make([]*dao.TblU3Server, 0)
// 拉起新的pod
startCount := count - len(disableServers)
for i := 0; i < startCount; i++ {
hostname := getNextPodName(nameSet, deploy.ServiceName, deploy.ImageVersion, i)
serviceName := deploy.ServiceName
version := deploy.ImageVersion
rollId := getRollId(rollIdMap, getSv(serviceName, version))
server := dao.BuildU3Server(hostname, rollId, serviceName, version)
entities = append(entities, server)
if err := startPod(server, deploy); err != nil {
log.Error("startPod failed as err:%v", err)
continue
}
}
if err := dao.UpsertU3servers(db, entities); err != nil {
log.Error("UpsertU3servers failed as err:%v", err)
}
}
num := math.Min(len(disableServers), count)
ids := make([]int, num)
for i := 0; i < num; i++ {
// 更新数据库
disableServers[i].Enabled = true
modifyServers(servers, disableServers[i])
ids[i] = disableServers[i].Id
}
err := dao.UpdateServersEnabled(db, ids)
if err != nil {
log.Error("UpdateServersEnabled failed as err:%v", err)
}
return
}
func dealDisablePod(log logger.ILog, servers []*dao.TblU3Server, count int) {
enableServers := GetServersFilter(servers, func(server *dao.TblU3Server) bool {
return server.Enabled
})
sort.Slice(enableServers, func(i, j int) bool {
if enableServers[i].UserCount == enableServers[j].UserCount {
return enableServers[i].CreateTime > enableServers[j].CreateTime
} else {
return enableServers[i].UserCount < enableServers[j].UserCount
}
})
ids := make([]int, count)
for i := 0; i < count; i++ {
// 更新数据库
enableServers[i].Enabled = false
modifyServers(servers, enableServers[i])
ids[i] = enableServers[i].Id
}
db := db2.Db(context.Background())
err := dao.UpdateServersDisabled(db, ids)
if err != nil {
log.Error("UpdateServersDisabled failed as err:%v", err)
}
}
func dealDeletePod(log logger.ILog, serversForDelete []*dao.TblU3Server) {
sd := make([]*dao.TblU3Server, 0)
// 删除pod
for _, server := range serversForDelete {
log.Debug("releasePod start server:%s", server)
if err := releasePod(server); err != nil {
log.Error("releasePod failed as err:%v", err)
}
// 回调bbs 失败暂不重试
if err := rpcBbsServerDelete(server.Hostname); err != nil {
log.Error("rpcBbsServerDelete failed as err:%v", err)
}
sd = append(sd, server)
log.Debug("releasePod success end server:%s", server)
}
db := db2.Db(context.Background())
if err := dao.DeleteServers(db, sd); err != nil {
log.Error("DeleteServers failed as err:%v", err)
}
}
func getHostNameSet(servers []*dao.TblU3Server) *_set.Set[string] {
for _, s := range servers {
nameSet.Add(s.Hostname)
}
return nameSet
}
// key:serviceName+version
func getRollIdMap(servers []*dao.TblU3Server) map[string]string {
rollIdMap := make(map[string]string, 0)
for _, s := range servers {
sv := getSv(s.ServiceName, s.Version)
rollIdMap[sv] = s.RollId
}
return rollIdMap
}
func dealSvAndRelease(log logger.ILog, servers []*dao.TblU3Server, svSet *_set.Set[string], rsSet *_set.Set[int]) []*dao.TblU3Server {
ids := make([]int, 0)
serversForDelete := make([]*dao.TblU3Server, 0)
for _, server := range servers {
sv := getSv(server.ServiceName, server.Version)
if !svSet.Has(sv) {
// 有其他的server可用 当前server才设置不可用
if server.Enabled && hasOtherEnabledServer(servers, server.ServiceName) {
ids = append(ids, server.Id)
server.Enabled = false
}
if server.IsReadyRelease() {
rsSet.Add(server.Id)
}
if server.CanDelete() {
serversForDelete = append(serversForDelete, server)
}
}
}
db := db2.Db(context.Background())
if err := dao.UpdateServersDisabled(db, ids); err != nil {
log.Error("UpdateServersDisabled failed as err:%v", err)
}
if err := dao.UpdateServersRelease(db, rsSet.Slice()); err != nil {
log.Error("UpdateServersRelease failed as err:%v", err)
}
return serversForDelete
}
func dealCanDelete(servers []*dao.TblU3Server) []*dao.TblU3Server {
serversForDelete := make([]*dao.TblU3Server, 0)
for _, s := range servers {
if s.CanDelete() {
serversForDelete = append(serversForDelete, s)
}
}
return serversForDelete
}
func dealCanRelease(servers []*dao.TblU3Server, rsSet *_set.Set[int]) {
for _, server := range servers {
if server.IsReadyRelease() {
rsSet.Add(server.Id)
}
}
}
func modifyServers(servers []*dao.TblU3Server, server *dao.TblU3Server) {
for _, s := range servers {
if s.Id == server.Id {
s = server
}
}
}
func getSv(serviceName, version string) string {
return fmt.Sprintf("%s_%s", serviceName, version)
}
func rpcBbsServerDelete(hostname string) error {
bbsUrl := config.Config.BbsUrl + "/upime3/serverDelete"
body := map[string]any{
"hostname": hostname,
}
if err := http_util.PlasoPost(bbsUrl, body, nil); err != nil {
return err
}
return nil
}
func getServiceNameEnableCount(servers []*dao.TblU3Server, serviceName string) int {
count := 0
for _, server := range servers {
if server.ServiceName == serviceName && server.Enabled {
count++
}
}
return count
}
func hasOtherEnabledServer(servers []*dao.TblU3Server, serviceName string) bool {
if getServiceNameEnableCount(servers, serviceName) > 1 {
return true
}
return false
}
func DeleteU3Server(hostname string) {
log := logger.LogPrefix(fmt.Sprintf("DeleteU3Server [hostname: %s]", hostname))
db := db2.Db(context.Background())
if err := releasePodDirect(hostname); err != nil {
log.Error("release pod failed, err: %s", err)
}
log.Debug("release pod success")
if err := dao.DeleteServerByHostname(db, hostname); err != nil {
log.Error("DeleteServerByHostname failed as err: %s", err)
}
log.Debug("delete db record success")
if err := rpcBbsServerDelete(hostname); err != nil {
log.Error("rpcBbsServerDelete failed as err: %s", err)
}
log.Debug("rpc u3balance success")
}
func filterValidDeploys(deploys []*dao.TblDeploy) []*dao.TblDeploy {
r := regexp.MustCompile(`^[0-9a-z.-]+$`)
res := make([]*dao.TblDeploy, 0, len(deploys))
for _, deploy := range deploys {
name := deploy.ServiceName + deploy.ImageVersion
if r.MatchString(name) {
res = append(res, deploy)
} else {
logger.Error("invalid serviceName or version, name: %s", name)
}
}
return res
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
- 117.
- 118.
- 119.
- 120.
- 121.
- 122.
- 123.
- 124.
- 125.
- 126.
- 127.
- 128.
- 129.
- 130.
- 131.
- 132.
- 133.
- 134.
- 135.
- 136.
- 137.
- 138.
- 139.
- 140.
- 141.
- 142.
- 143.
- 144.
- 145.
- 146.
- 147.
- 148.
- 149.
- 150.
- 151.
- 152.
- 153.
- 154.
- 155.
- 156.
- 157.
- 158.
- 159.
- 160.
- 161.
- 162.
- 163.
- 164.
- 165.
- 166.
- 167.
- 168.
- 169.
- 170.
- 171.
- 172.
- 173.
- 174.
- 175.
- 176.
- 177.
- 178.
- 179.
- 180.
- 181.
- 182.
- 183.
- 184.
- 185.
- 186.
- 187.
- 188.
- 189.
- 190.
- 191.
- 192.
- 193.
- 194.
- 195.
- 196.
- 197.
- 198.
- 199.
- 200.
- 201.
- 202.
- 203.
- 204.
- 205.
- 206.
- 207.
- 208.
- 209.
- 210.
- 211.
- 212.
- 213.
- 214.
- 215.
- 216.
- 217.
- 218.
- 219.
- 220.
- 221.
- 222.
- 223.
- 224.
- 225.
- 226.
- 227.
- 228.
- 229.
- 230.
- 231.
- 232.
- 233.
- 234.
- 235.
- 236.
- 237.
- 238.
- 239.
- 240.
- 241.
- 242.
- 243.
- 244.
- 245.
- 246.
- 247.
- 248.
- 249.
- 250.
- 251.
- 252.
- 253.
- 254.
- 255.
- 256.
- 257.
- 258.
- 259.
- 260.
- 261.
- 262.
- 263.
- 264.
- 265.
- 266.
- 267.
- 268.
- 269.
- 270.
- 271.
- 272.
- 273.
- 274.
- 275.
- 276.
- 277.
- 278.
- 279.
- 280.
- 281.
- 282.
- 283.
- 284.
- 285.
- 286.
- 287.
- 288.
- 289.
- 290.
- 291.
- 292.
- 293.
- 294.
- 295.
- 296.
- 297.
- 298.
- 299.
- 300.
- 301.
- 302.
- 303.
- 304.
- 305.
- 306.
- 307.
- 308.
- 309.
- 310.
- 311.
- 312.
- 313.
- 314.
- 315.
- 316.
- 317.
- 318.
- 319.
- 320.
- 321.
- 322.
- 323.
- 324.
- 325.
- 326.
- 327.
- 328.
- 329.
- 330.
- 331.
- 332.
- 333.
- 334.
- 335.
- 336.
- 337.
- 338.
- 339.
- 340.
- 341.
- 342.
- 343.
- 344.
- 345.
- 346.
- 347.
- 348.
- 349.
- 350.
- 351.
- 352.
- 353.
- 354.
- 355.
- 356.
- 357.
- 358.
- 359.
- 360.
4.4 server.go
数据库ddl,操作server表
package dao
import (
"gorm.io/gorm"
"support/util"
"time"
)
type TblU3Server struct {
Id int `gorm:"column:id"`
RollId string `gorm:"column:rollId"`
Version string `gorm:"column:version"`
HbTime int64 `gorm:"column:hbTime"`
CreateTime int64 `gorm:"column:createTime"`
Enabled bool `gorm:"column:enabled"`
Hostname string `gorm:"column:hostname"`
ServiceName string `gorm:"column:serviceName"`
ReleaseTime int64 `gorm:"column:releaseTime"`
NodeCount int `gorm:"column:nodeCount"`
UserCount int `gorm:"column:userCount"`
}
func (us *TblU3Server) String() string {
return util.ConvertToJsonStr(us)
}
const tblU3Server = "tbl_u3_server"
func GetU3Server(db *gorm.DB) ([]*TblU3Server, error) {
var res []*TblU3Server
err := db.Table(tblU3Server).Find(&res).Error
return res, err
}
func BuildU3Server(hostname, rollId, serviceName, version string) *TblU3Server {
return &TblU3Server{
Hostname: hostname,
RollId: rollId,
CreateTime: util.NowMs(),
ServiceName: serviceName,
Version: version,
}
}
func UpsertU3servers(db *gorm.DB, entities []*TblU3Server) error {
if len(entities) == 0 {
return nil
}
return db.Table(tblU3Server).Save(entities).Error
}
func (us *TblU3Server) SetForDelete() {
us.Enabled = false
us.UserCount = 0
us.ReleaseTime = util.NowMs()
}
func (us *TblU3Server) IsNoHeart() bool {
now := util.NowMs()
f1 := now-us.HbTime > time.Minute.Milliseconds()
f2 := now-us.CreateTime > 2*time.Minute.Milliseconds()
return f1 && f2
}
func (us *TblU3Server) CanDelete() bool {
now := util.NowMs()
return us.Enabled == false && us.UserCount == 0 && us.ReleaseTime > 0 && us.ReleaseTime < now
}
func (us *TblU3Server) IsJustCreating() bool {
now := util.NowMs()
return us.HbTime == 0 && now-us.CreateTime < 2*time.Minute.Milliseconds()
}
func (us *TblU3Server) IsReadyRelease() bool {
return us.Enabled == false && us.UserCount == 0 && us.ReleaseTime == 0
}
func UpdateServersEnabled(db *gorm.DB, ids []int) error {
if len(ids) == 0 {
return nil
}
return db.Table(tblU3Server).
Where("id in ?", ids).
Updates(map[string]any{"enabled": true, "releaseTime": 0}).Error
}
func UpdateServersDisabled(db *gorm.DB, ids []int) error {
if len(ids) == 0 {
return nil
}
return db.Table(tblU3Server).
Where("id in ?", ids).
Updates(map[string]any{"enabled": false}).Error
}
func DeleteServers(db *gorm.DB, entities []*TblU3Server) error {
if len(entities) == 0 {
return nil
}
return db.Table(tblU3Server).Delete(&entities).Error
}
func DeleteServerByHostname(db *gorm.DB, hostname string) error {
return db.Table(tblU3Server).
Where("hostname = ?", hostname).
Delete(nil).Error
}
func UpdateServersRelease(db *gorm.DB, ids []int) error {
if len(ids) == 0 {
return nil
}
return db.Table(tblU3Server).Where("id in ?", ids).
Update("releaseTime", time.Now().Add(time.Minute).UnixMilli()).Error
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
4.5 deploy.go
部署表,用来控制需要多少pod数量
package dao
import (
"gorm.io/gorm"
"support/util"
)
const tblDeploy = "tbl_deploy"
//const tblDeploy = "tbl_deploy_copy1"
type TblDeploy struct {
MinCount int `gorm:"column:minCount"`
BakCount int `gorm:"column:bakCount"`
AvgUser int `gorm:"column:avgUser"`
AvgMeeting int `gorm:"column:avgMeeting"`
Name string `gorm:"column:name"`
ImageVersion string `gorm:"column:imageVersion"`
ServiceName string `gorm:"column:serviceName"`
ImageName string `gorm:"column:imageName"`
LaunchCmd string `gorm:"column:launchCmd"`
}
func (us *TblDeploy) String() string {
return util.ConvertToJsonStr(us)
}
func QueryDeploy(db *gorm.DB) ([]*TblDeploy, error) {
var deployInfos []*TblDeploy
err := db.Table(tblDeploy).Find(&deployInfos).Error
return deployInfos, err
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
5.代码解读
解决pod重名问题
用正则过滤不符合规范的pod名字
所有评论(0)