go的语言特性符合字节业务场景

​ go天生支持高并发,符合字节跳动这种流媒体的业务场景,而JAVA的优势是生态比较强大,从web应用到大数据、云计算,JAVA都有成熟的解决方案。而go的性能比较高,又可以兼容C语言编写的程序,与C无缝衔接。就像scala兼容Java一样。而Java程序运行在虚拟机上,这一点就限制了java的性能。java为了应对高并发专一弄了一个juc的库。综合分析下来,go符合字节的业务,技术选型当然首选go了。当然,这里并不是说java语言不好,java有他自身的魅力和优势。

​ go诞生于Google公司的工程师,是出自c语言之父之手,底层是拿c开发的,所以可想而知go具备高性能和天生支持高并发的特性。随着go的兴起,go语言的生态也在不断完善…,像docker,k8s,底层都是拿go语言实现的。所以java有它的优势,go也有它的优势,没有谁比谁更好这一说,只有合适。

​ 那为什么说go语言天生支持高并发呢?因为go语言中引入了协程的概念,也就是goroutine,协程是比线程更小的执行单元。我们可以理解进程是计算机资源分配的最小单元,也是CPU分配资源的最小单位,具有独立的内存。而线程是计算机调度的最小单元,也是程序执行的最小单元,一个进程中会有多个线程。要想实现高并发,就是基于CPU的分时调度模型,CPU在不同时间点来回切换执行,那么java的多线程操作,也是基于此,CPU在不同的时间点来回切换,但是在同一时间点只能有一个线程工作,另外的线程处于阻塞状态,由于切换较快看上去似乎是同一时间。go协程比线程轻量,cpu的开销小。而且在 Go 语言中,有一个单独的调度器(Scheduler)负责调度和管理 Goroutine 的执行。调度器会自动将 Goroutine 分配给可用的 CPU 核心,并根据需要进行抢占式调度,以实现并发的执行。这使得编写并发程序变得更加简单,无需手动管理线程的创建和调度。因此go语言在底层就实现了高并发。

gin框架实现一个webSocket聊天功能的案例

定义一个连接websocket的启动函数

package service

import (
	//"chat/conf"
	"xxx/src/config/e"
	"xxx/src/src/utils"
	"encoding/json"
	_ "fmt"
	//"github.com/gin-gonic/gin/binding"

	"github.com/gorilla/websocket"
	"log"
)

func (manager *ClientManager) WebSocketStart() {
	for {
		log.Println("<---监听管道通信--->")
		select {
		case conn := <-Manager.Register: // 建立连接
			log.Printf("建立新连接: %v", conn.conversation.Uid)
			Manager.Clients[utils.Strval(conn.conversation.Uid)] = conn
			replyMsg := &ReplyMsg{
				Code:    e.WebsocketSuccess,
				Content: "已连接至服务器",
			}
			//这里调用记忆文件方法

			msg, _ := json.Marshal(replyMsg)
			_ = conn.Socket.WriteMessage(websocket.TextMessage, msg)

		case conn := <-Manager.Unregister: // 断开连接
			log.Printf("连接失败:%v", conn.conversation.Uid)
			if _, ok := Manager.Clients[utils.Strval(conn.conversation.Uid)]; ok {
				replyMsg := &ReplyMsg{
					Code:    e.WebsocketEnd,
					Content: "连接已断开",
				}
				msg, _ := json.Marshal(replyMsg)
				_ = conn.Socket.WriteMessage(websocket.TextMessage, msg)
				close(conn.Send)
				delete(Manager.Clients, utils.Strval(conn.conversation.Uid))
			}
			//广播信息

		}
		//
	}
}

定义一个chat聊天服务的程序

package service

import (
	"xxx/src/config"
	"xxx/src/config/e"
	"xxx/src/src/controllers"
	"xxx/src/datasource"
	"xxx/src/models"
	"xxx/src/utils"
	"github.com/gin-contrib/sessions"
	"os"
	"strconv"
	"time"

	//"chat/cache"
	//"chat/conf"
	"encoding/json"
	"fmt"
	"github.com/gin-gonic/gin"
	"github.com/gorilla/websocket"
	"log"
	"net/http"
	//"strconv"
	//"time"
)

//const month = 60 * 60 * 24 * 30 // 按照30天算一个月

// 发送消息的类型
type SendMsg struct {
	Type    int    `json:"type"`
	Content string `json:"content"`
}

// 回复的消息
/* 这个要注释掉 */
type ReplyMsg struct {
	From    string `json:"from"`
	Code    int    `json:"code"`
	Content string `json:"content"`
}

type HuaSoulReplyMsg struct {
	Code   int                    `json:"code"`
	Status map[string]interface{} `json:"status"`
	ISLIU  string                 `json:"ISLIU"`
}

/*
构建Client表
*/
//Id|Uid|Did|title|descript
type Client struct {
	conversation *models.Conversation
	Socket       *websocket.Conn
	Send         chan []byte
}

// 用户类
/*type Client struct {
	ID     string
	SendID string
	Socket *websocket.Conn
	Send   chan []byte
}*/

// 广播类,包括广播内容和源用户
/*type Broadcast struct {
	Client  *Client
	Message []byte
	Type    int
}
*/

// 用户管理
type ClientManager struct {
	Clients map[string]*Client
	//Broadcast  chan *Broadcast
	Reply      chan *Client
	Register   chan *Client
	Unregister chan *Client
}

// Message 信息转JSON (包括:发送者、接收者、内容)
type Message struct {
	Sender    string `json:"sender,omitempty"`
	Recipient string `json:"recipient,omitempty"`
	Content   string `json:"content,omitempty"`
}

var Manager = ClientManager{
	Clients: make(map[string]*Client), // 参与连接的用户,出于性能的考虑,需要设置最大连接数
	//Broadcast:  make(chan *Broadcast),
	Register:   make(chan *Client),
	Reply:      make(chan *Client),
	Unregister: make(chan *Client),
}

func WsHandler(ctx *gin.Context) {

	/*
	 http协议升级为wobSocket协议
	*/
	conn, err := (&websocket.Upgrader{
		CheckOrigin: func(r *http.Request) bool { // CheckOrigin解决跨域问题
			return true
		}}).Upgrade(ctx.Writer, ctx.Request, nil) // 升级成ws协议

	if err != nil {
		http.NotFound(ctx.Writer, ctx.Request)
		return
	}
	/*
	   从session里获取用户id
	*/

	session := sessions.Default(ctx)
	userId := session.Get("UserID")
	if userId == nil {
		// 处理session中未找到值的情况,返回错误信息
		/*这里是双保险为了测试用*/
		userId = ctx.Query("id")
		if userId == nil {
			ctx.JSON(http.StatusOK, gin.H{"code": 0, "message": "Session中未找到用户ID,用户未登录"})
			return
		}
	}
	user_Id, ok := userId.(string)
	if !ok {
		// 类型断言失败,处理类型不匹配的情况
		ctx.JSON(http.StatusOK, gin.H{"code": 0, "message": "用户ID类型错误"})
		return
	}
	//userId作为string类型的值进行后续操作
	_, b := controllers.CacheCode.Get(user_Id)

	uid, err := strconv.Atoi(user_Id)
	if err != nil {
		fmt.Println("用户id转换失败:", err)
		return
	}

	//从session中获取用户id
	/*
	   判断用户是否登录
	*/

	if b {
		digital_Id := ctx.Query("DpId")
		dp_Id, _ := strconv.Atoi(digital_Id)
		//查询数据库,根据数字人Id查询数字人信息
		digitalPerson, err := models.GetDigitalPersonByID(int64(dp_Id))
		if err != nil {
			ctx.JSON(http.StatusOK, gin.H{"code": 0, "message": "用户与数字人创建会话时,查询数字人失败"})
			log.Println(err.Error())
			return
		}
		fmt.Println(digitalPerson)

		ctx.JSON(http.StatusOK, gin.H{"code": 1, "message": "用户与数字人创建会话时,查询数字人", "digitalPerson": digitalPerson})

		// 根据用户id和数字人id,查询会话表中是否有会话记录
		conversation, err := models.GetConversationsByUserIdAndDpId(int64(uid), int64(dp_Id))
		if err != nil {
			ctx.JSON(http.StatusOK, gin.H{"code": 0, "message": "查询会话记录失败!"})
			return
		}

		var (
			//conversation models.Conversation
			ISULU_ID string
		)

		if conversation == nil {
			// 如果查询不到记录,则插入一条新的会话记录
			//创建调用AI接口实例
			apiCode, ISULU_ID, err := HuaSoulNewAPI(int64(dp_Id))

			if err != nil {
				ctx.JSON(http.StatusOK, gin.H{"code": 0, "content": "数字人会话创建连接,失败!"})
			}
			if apiCode != 1 && ISULU_ID == "" {
				ctx.JSON(http.StatusOK, gin.H{"code": 0, "content": "创建调用数字人会话接口实例,失败!"})

			}
			fmt.Println(ISULU_ID)

			conversation = &models.Conversation{
				Uid:     int64(uid),
				DpId:    int64(dp_Id),
				Title:   "用户Id为" + user_Id + "的用户和数字人" + digitalPerson.Name + "开始会话",
				IsliuId: ISULU_ID,
			}
			/*
			 往mysql数据库的会话表中插入一条记录
			*/
			err = models.InsertConversation(conversation)
			if err != nil {
				ctx.JSON(http.StatusOK, gin.H{"code": 0, "message": "创建会话后,向数据库插入数据失败!"})
				return
			}

		} else {
			ISULU_ID = conversation.IsliuId
			//NewConversation = *conversation
			// 如果查询到记录,调用查询意识流Id接口判断意识流id是否有效,若无效则修改意识流ID为新的意识流ID
			flag := HuaSoulFindISLIUID(conversation.IsliuId)
			fmt.Println("查询意识流Id, flag: ", flag)
			//没找到
			if !flag {
				//创建调用AI接口实例
				apiCode, ISULU_ID, err := HuaSoulNewAPI(int64(dp_Id))
				if err != nil {
					ctx.JSON(http.StatusOK, gin.H{"code": 0, "content": "数字人会话创建连接,失败!"})
				}
				if apiCode != 1 && ISULU_ID == "" {
					ctx.JSON(http.StatusOK, gin.H{"code": 0, "content": "创建调用数字人会话接口实例,失败!"})

				}
				// 把调用AI接口获取的意识流ID赋给conversation.IsliuId
				conversation.IsliuId = ISULU_ID

				err = models.UpdateConversation(conversation)

				if err != nil {
					ctx.JSON(http.StatusOK, gin.H{"code": 0, "message": "更新会话意识流id失败!"})
					return
				}
			}

		}

		// 创建一个用户客户端会话实例
		newClient := &Client{
			conversation: conversation,
			Socket:       conn,
			Send:         make(chan []byte),
		}
		// 用户会话注册到用户管理上
		Manager.Register <- newClient

		// ---------------------------------------- //

		url := config.Conf.SoulUrl + "/soul/neural"
		fileName := "./neural/" + ISULU_ID + ".CFN"
		// 获取文件信息
		_, err = os.Stat(fileName)
		if err == nil {
			fmt.Println("文件存在")
			// 构造请求参数
			request := SoulNeuralRequest{
				Auth:    "1d527cbf2cd50ad7e97f0aa23e2a712e",
				Type:    1, // 1表示上载
				ISLIUid: ISULU_ID,
			}
			// 调用函数
			_, err := SoulNeuralFileUpload(url, request, fileName)
			if err != nil {
				log.Printf("Error during soul neural transfer: %v", err)
			}
			// 处理响应
			//fmt.Println("Response received: ", response)

		} else if os.IsNotExist(err) {
			fmt.Println("文件不存在")
		} else {
			fmt.Println("获取文件信息出错:", err)
		}

		// ---------------------------------------- //
		//fmt.Println("调用socket的Read方法前:", ISULU_ID)
		go newClient.Read()
		go newClient.Write()

	} else {
		ctx.JSON(http.StatusOK, gin.H{"code": 0, "message": "用户未登录"})
		return
	}

}

type ErrorMessage struct {
	Code    int
	Message string
}

/*
 从websocket读取客户端用户的消息,然后服务器回应前端一个消息
*/
func (c *Client) Read() {
	defer func() { // 避免忘记关闭,所以要加上close
		Manager.Unregister <- c
		_ = c.Socket.Close()

		url := config.Conf.SoulUrl + "/soul/neural"
		fileName := "./neural/" + c.conversation.IsliuId + ".CFN"

		// 构造请求参数
		request := SoulNeuralRequest{
			Auth:    "1d527cbf2cd50ad7e97f0aa23e2xxx",
			Type:    0, // 0表示下载
			ISLIUid: c.conversation.IsliuId,
		}
		// 调用函数
		err1 := SoulNeuralFileDownload(url, request, fileName)
		if err1 != nil {
			log.Printf("Error during soul neural transfer: %v", err1)
		}

	}()

	for {
		c.Socket.PongHandler()
		sendMsg := new(SendMsg)
		err := c.Socket.ReadJSON(&sendMsg)
		if err != nil {
			log.Println("数据格式不正确", err)
			Manager.Unregister <- c
			_ = c.Socket.Close()

			// 向前端返回数据格式不正确的状态码和消息
			errorMsg := ErrorMessage{
				Code:    -1,
				Message: "数据格式不正确",
			}
			errmsg, _ := json.Marshal(errorMsg)
			_ = c.Socket.WriteMessage(websocket.TextMessage, errmsg)

			break
		}

		// 调用AI的会话接口
		resCode, status, ISLIU, chat_err := c.HuaSoulEchoAPI(sendMsg)
		if chat_err != nil {
			log.Println("会话失败")
			// 向前端返回数据格式不正确的状态码和消息
			errorMsg := ErrorMessage{
				Code:    -1,
				Message: "会话失败",
			}
			errmsg, _ := json.Marshal(errorMsg)
			_ = c.Socket.WriteMessage(websocket.TextMessage, errmsg)
			return
		}

		// 读取前端发送过来的消息并打印
		fmt.Println(sendMsg)

		/*
		   响应类型0 交互
		*/
		// 拼接转换会话Id
		conversation_Id := utils.Strval(c.conversation.Uid) + "_" + utils.Strval(c.conversation.DpId)

		if sendMsg.Type == 0 {
			/*
			   收到前端发送过来的消息,保存至数据库
			*/
			senderType := "user"
			chatDB_err := saveChatRecordMongoDB(conversation_Id, c.conversation.Uid, sendMsg.Content, senderType)
			// 错误处理
			if chatDB_err != nil {
				log.Println("保存用户聊天记录至数据库出错!")

				errorMsg := ErrorMessage{
					Code:    -1,
					Message: "保存用户聊天记录至数据库出错!",
				}
				errmsg, _ := json.Marshal(errorMsg)
				_ = c.Socket.WriteMessage(websocket.TextMessage, errmsg)

				return
			}

			/*
			   调用AI模型接口回复前端用户的消息
			*/
			HSReplyMsg := HuaSoulReplyMsg{
				Code:   resCode,
				Status: status,
				ISLIU:  ISLIU,
			}

			msg, _ := json.Marshal(HSReplyMsg)
			_ = c.Socket.WriteMessage(websocket.TextMessage, msg)
			/*
			   mongoDB保存数字人发送给前端用户的聊天记录
			*/

			senderType2 := "bot"
			chatDB_err2 := saveChatRecordMongoDB(conversation_Id, c.conversation.DpId, ISLIU, senderType2)
			// 错误处理
			if chatDB_err2 != nil {
				log.Println("保存数字人聊天记录至数据库出错!")

				errorMsg := ErrorMessage{
					Code:    -1,
					Message: "保存数字人聊天记录至数据库出错!",
				}
				errmsg, _ := json.Marshal(errorMsg)
				_ = c.Socket.WriteMessage(websocket.TextMessage, errmsg)

				return
			}

			/*
			  发送消息成功,resCode为1
			*/

			if resCode == 1 {
				//增加数字人的交互量
				err := models.IncrementDigitalPersonInteractionCount(c.conversation.DpId)
				if err != nil {
					log.Fatal("更新交互量失败:", err)
				}
				fmt.Println("交互量更新成功!")
			}

		}

		/*
		   type为1,表示用户无回应,数字人进行主动发送信息,进行主动关怀!!
		*/

		if sendMsg.Type == 1 {

			resCode, status, ISLIU, chat_err := c.HuaSoulEchoAPI(sendMsg)
			if chat_err != nil {
				log.Println("会话失败")
				// 向前端返回数据格式不正确的状态码和消息
				errorMsg := ErrorMessage{
					Code:    -1,
					Message: "会话失败",
				}
				errmsg, _ := json.Marshal(errorMsg)
				_ = c.Socket.WriteMessage(websocket.TextMessage, errmsg)
				return
			}
			HSReplyMsg := HuaSoulReplyMsg{
				Code:   resCode,
				Status: status,
				ISLIU:  ISLIU,
			}

			//解析AI接口返回的数据
			msg, _ := json.Marshal(HSReplyMsg)

			/*
			   mongoDB保存数字人发送给前端用户的聊天记录
			*/

			senderType := "bot"
			chatDB_err := saveChatRecordMongoDB(conversation_Id, c.conversation.DpId, ISLIU, senderType)
			// 错误处理
			if chatDB_err != nil {
				log.Println("保存数字人聊天记录至数据库出错!")

				errorMsg := ErrorMessage{
					Code:    -1,
					Message: "保存数字人聊天记录至数据库出错!",
				}
				errmsg, _ := json.Marshal(errorMsg)
				_ = c.Socket.WriteMessage(websocket.TextMessage, errmsg)

				return
			}

			// 回复数据至前端用户
			_ = c.Socket.WriteMessage(websocket.TextMessage, msg)

			/*
			  发送消息成功,resCode为1
			*/

			if resCode == 1 {
				//增加数字人的交互量
				err := models.IncrementDigitalPersonInteractionCount(c.conversation.DpId)
				if err != nil {
					log.Fatal("更新交互量失败:", err)
				}
				fmt.Println("交互量更新成功!")
			}

		}

	}

}

func (c *Client) Write() {
	defer func() {
		_ = c.Socket.Close()
	}()
	for {
		select {
		case message, ok := <-c.Send:
			if !ok {
				_ = c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}
			log.Println(c.conversation.Uid, "接受消息:", string(message))
			replyMsg := ReplyMsg{
				Code:    e.WebsocketSuccessMessage,
				Content: fmt.Sprintf("%s", string(message)),
			}
			msg, _ := json.Marshal(replyMsg)
			_ = c.Socket.WriteMessage(websocket.TextMessage, msg)
		}
	}
}

//保存聊天会话到MongoDB数据库
func saveChatRecordMongoDB(conversationId string, senderId int64, message string, senderType string) error {
	chatRecord, err := datasource.GetChatRecordByConversationID(conversationId)
	if err != nil {
		log.Println("保存聊天会话到MongoDB数据库,err: ", err)
		return err
	}
	//timeString := utils.GetNormalTimeString(time.Now())

	// 如果数据库中,聊天记录表没有这个聊天记录则插入一条
	if chatRecord == nil {
		chatRecordArray := datasource.ChatRecordArray{
			ConversationId: conversationId,
			Records: []datasource.ChatRecord{
				{SenderId: senderId, Timestamp: time.Now().Unix(), Message: message, SenderType: senderType},
			},
		}
		err = datasource.SaveChatRecordArray(chatRecordArray)
		if err != nil {
			log.Println("MongoDB数据库插入一条数据,err: ", err)
			return err
		}
		fmt.Println("ChatRecordArray  first saved successfully")
	} else {
		// 如果数据库中已经存在聊天记录,则更新记录
		chatRecords := []datasource.ChatRecord{
			{SenderId: senderId, Timestamp: time.Now().Unix(), Message: message, SenderType: senderType},
		}
		err := datasource.AddChatRecordToConversation(conversationId, chatRecords)
		if err != nil {
			log.Println("MongoDB数据库更新一条数据,err: ", err)
			return err
		}
		fmt.Println("ChatRecordArray saved successfully")
	}

	return nil
}

main方法中启动连接websocket服务

  • 启动聊天协程监听连接
    • go service.Manager.WebSocketStart()
package main

import (
	"xxx/src/config"
	"xxx/src/datasource"
	"xxx/src/routers"
	"xxx/src/service"
	"fmt"
	"time"
)

func main() {
	config.InitConfig()
	datasource.InitMysql()
	// 同步结构体与数据库表
	/*	err := datasource.Engine.Sync2(new(models.Conversation))
		if err != nil {
			log.Fatal("创建表失败:", err)
		}*/

	//println(utils.GetNormalTimeString(time.Now()))
	//fmt.Println(time.Now().Unix())

	// 将时间戳转换为时间对象
	tm := time.Unix(time.Now().Unix(), 0)
	// 将时间对象格式化为日期字符串
	dateString := tm.Format("2006-01-02 15:04:05") // 格式化模板根据需求进行调整,这里使用"2006-01-02"表示年月日

	fmt.Println("dateString: ", dateString)

	//启动聊天协程监听连接
	go service.Manager.WebSocketStart()
	router := routers.InitRouter()
	/*
		// 中间件对应的包:github.com/unrolled/secure

		router.Use(https_auth.HttpsHandler())                                                  //https对应的中间件
		path := "src/https_auth/CA/"                                                           //证书的路径
		router.RunTLS(":8080", path+"admin.mindepoch.com.pem", path+"admin.mindepoch.com.key") //开启HTTPS服务

	*/
	/*	println("education innovation revolution")
		println("教育 创新 变革")
		println("test ....")
	*/

	//初始化redis配置
	//utils.InitRedisStore()
	// Run("里面不指定端口号默认为8080")
	router.Run(":8089")

}

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐