gin event source实现k8s pod logs 实时刷新
【代码】gin event source实现k8s pod logs 实时刷新。
·
前端js
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<div id="result">
</div>
<div>
<button type="button" onclick="submit()">提交!</button>
</div>
</body>
<script>
function submit() {
const stream = new EventSource("http://localhost:8080/stream?namespace=dev&podName=xxxx&containerName=xxxxxx")
stream.addEventListener("message", function (e) {
console.log(e)
document.getElementById("result").innerHTML += e.data
});
stream.addEventListener("stop", function (e) {
stream.close()
console.log(e)
});
stream.onerror = function (event) {
stream.close()
console.log(event)
}
}
</script>
</html>
后端go代码
package main
import (
"bufio"
"context"
"errors"
"fmt"
"github.com/gin-gonic/gin"
cors "github.com/rs/cors/wrapper/gin"
"io"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"log"
"net/http"
"os"
"os/signal"
"time"
)
const kubeconfig = `
`
type SteamRequest struct {
Namespace string `form:"namespace" binding:"required"`
PodName string `form:"podName" binding:"required"`
ContainerName string `form:"containerName" binding:"required"`
}
func setupRouter() *gin.Engine {
r := gin.Default()
r.Use(cors.New(cors.Options{
AllowedOrigins: []string{"*"},
AllowedMethods: []string{
http.MethodHead,
http.MethodGet,
http.MethodPost,
http.MethodPut,
http.MethodPatch,
http.MethodDelete,
},
AllowedHeaders: []string{"*"},
AllowCredentials: true,
OptionsPassthrough: true,
}))
r.GET("/stream", func(c *gin.Context) {
var request SteamRequest
err := c.ShouldBindQuery(&request)
if err != nil {
log.Fatal(err.Error())
}
log.Printf("request: %v \n", request)
clientConfig, err := clientcmd.NewClientConfigFromBytes([]byte(kubeconfig))
if err != nil {
panic(err.Error())
}
config, err := clientConfig.ClientConfig()
if err != nil {
panic(err.Error())
}
// create the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
var lines int64 = 100
stream, err := clientset.CoreV1().Pods(request.Namespace).GetLogs(request.PodName, &v1.PodLogOptions{
Container: request.ContainerName,
Follow: true,
Timestamps: false,
SinceSeconds: nil,
SinceTime: nil,
TailLines: &lines,
Previous: false,
}).Stream(context.Background())
if err != nil {
return
}
chanStream := make(chan string, 100)
go func() {
defer stream.Close()
defer close(chanStream)
bufReader := bufio.NewReader(stream)
for {
line, err := bufReader.ReadString('\n')
if errors.Is(err, io.EOF) {
fmt.Println("Stream finished")
chanStream <- "<!finish>"
return
}
if err != nil {
fmt.Printf("Stream error: %v\n", err)
chanStream <- "<!error>"
return
}
chanStream <- line
fmt.Printf("Stream response: %v\n", line)
}
}()
c.Stream(func(w io.Writer) bool {
if msg, ok := <-chanStream; ok {
if msg == "<!finish>" {
c.SSEvent("stop", "finish")
}
if msg == "<!error>" {
c.SSEvent("stop", "error")
}
c.SSEvent("message", msg)
fmt.Printf("message: %v\n", msg)
return true
}
return false
})
})
return r
}
func main() {
gin.SetMode(gin.ReleaseMode)
router := setupRouter()
srv := &http.Server{
Addr: ":8080",
Handler: router,
}
go func() {
// 服务连接
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("listen: %s\n", err)
}
}()
// 等待中断信号以优雅地关闭服务器(设置 5 秒的超时时间)
quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt)
<-quit
log.Println("Shutdown Server ...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Fatal("Server Shutdown:", err)
}
log.Println("Server exiting")
}
更多推荐
已为社区贡献1条内容
所有评论(0)