前端js

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<div id="result">

</div>

<div>
    <button type="button" onclick="submit()">提交!</button>
</div>
</body>
<script>
    function submit() {

        const stream = new EventSource("http://localhost:8080/stream?namespace=dev&podName=xxxx&containerName=xxxxxx")
        stream.addEventListener("message", function (e) {
           console.log(e)
           document.getElementById("result").innerHTML += e.data
        });
        stream.addEventListener("stop", function (e) {
            stream.close()
            console.log(e)
        });
        stream.onerror = function (event) {
            stream.close()
            console.log(event)
        }
    }

</script>
</html>

后端go代码

package main

import (
	"bufio"
	"context"
	"errors"
	"fmt"
	"github.com/gin-gonic/gin"
	cors "github.com/rs/cors/wrapper/gin"
	"io"
	v1 "k8s.io/api/core/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
	"log"
	"net/http"
	"os"
	"os/signal"
	"time"
)

const kubeconfig = `
`

type SteamRequest struct {
	Namespace     string `form:"namespace" binding:"required"`
	PodName       string `form:"podName" binding:"required"`
	ContainerName string `form:"containerName" binding:"required"`
}

func setupRouter() *gin.Engine {
	r := gin.Default()

	r.Use(cors.New(cors.Options{
		AllowedOrigins: []string{"*"},
		AllowedMethods: []string{
			http.MethodHead,
			http.MethodGet,
			http.MethodPost,
			http.MethodPut,
			http.MethodPatch,
			http.MethodDelete,
		},
		AllowedHeaders:     []string{"*"},
		AllowCredentials:   true,
		OptionsPassthrough: true,
	}))

	r.GET("/stream", func(c *gin.Context) {
		var request SteamRequest
		err := c.ShouldBindQuery(&request)
		if err != nil {
			log.Fatal(err.Error())
		}
		log.Printf("request: %v \n", request)

		clientConfig, err := clientcmd.NewClientConfigFromBytes([]byte(kubeconfig))
		if err != nil {
			panic(err.Error())
		}
		config, err := clientConfig.ClientConfig()
		if err != nil {
			panic(err.Error())
		}
		// create the clientset
		clientset, err := kubernetes.NewForConfig(config)
		if err != nil {
			panic(err.Error())
		}
		var lines int64 = 100
		stream, err := clientset.CoreV1().Pods(request.Namespace).GetLogs(request.PodName, &v1.PodLogOptions{
			Container:    request.ContainerName,
			Follow:       true,
			Timestamps:   false,
			SinceSeconds: nil,
			SinceTime:    nil,
			TailLines:    &lines,
			Previous:     false,
		}).Stream(context.Background())

		if err != nil {
			return
		}

		chanStream := make(chan string, 100)
		go func() {
			defer stream.Close()
			defer close(chanStream)
			bufReader := bufio.NewReader(stream)
			for {
				line, err := bufReader.ReadString('\n')

				if errors.Is(err, io.EOF) {
					fmt.Println("Stream finished")
					chanStream <- "<!finish>"
					return
				}

				if err != nil {
					fmt.Printf("Stream error: %v\n", err)
					chanStream <- "<!error>"
					return
				}
				chanStream <- line
				fmt.Printf("Stream response: %v\n", line)
			}
		}()

		c.Stream(func(w io.Writer) bool {
			if msg, ok := <-chanStream; ok {
				if msg == "<!finish>" {
					c.SSEvent("stop", "finish")
				}
				if msg == "<!error>" {
					c.SSEvent("stop", "error")
				}
				c.SSEvent("message", msg)
				fmt.Printf("message: %v\n", msg)

				return true
			}
			return false
		})

	})
	return r
}

func main() {
	gin.SetMode(gin.ReleaseMode)
	router := setupRouter()

	srv := &http.Server{
		Addr:    ":8080",
		Handler: router,
	}
	go func() {
		// 服务连接
		if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			log.Fatalf("listen: %s\n", err)
		}
	}()

	// 等待中断信号以优雅地关闭服务器(设置 5 秒的超时时间)
	quit := make(chan os.Signal)
	signal.Notify(quit, os.Interrupt)
	<-quit
	log.Println("Shutdown Server ...")

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	if err := srv.Shutdown(ctx); err != nil {
		log.Fatal("Server Shutdown:", err)
	}
	log.Println("Server exiting")
}

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐