字节为何选择go语言作为开发语言,gin框架实现一个webSocket聊天功能的案例
我们可以理解进程是计算机资源分配的最小单元,也是CPU分配资源的最小单位,具有独立的内存。go天生支持高并发,符合字节跳动这种流媒体的业务场景,而JAVA的优势是生态比较强大,从web应用到大数据、云计算,JAVA都有成熟的解决方案。go诞生于Google公司的工程师,是出自c语言之父之手,底层是拿c开发的,所以可想而知go具备高性能和天生支持高并发的特性。,像docker,k8s,底层都是拿go
go的语言特性符合字节业务场景
go天生支持高并发,符合字节跳动这种流媒体的业务场景,而JAVA的优势是生态比较强大,从web应用到大数据、云计算,JAVA都有成熟的解决方案。而go的性能比较高,又可以兼容C语言编写的程序,与C无缝衔接。就像scala兼容Java一样。而Java程序运行在虚拟机上,这一点就限制了java的性能。java为了应对高并发专一弄了一个juc的库。综合分析下来,go符合字节的业务,技术选型当然首选go了。当然,这里并不是说java语言不好,java有他自身的魅力和优势。
go诞生于Google公司的工程师,是出自c语言之父之手,底层是拿c开发的,所以可想而知go具备高性能和天生支持高并发的特性。随着go的兴起,go语言的生态也在不断完善…,像docker,k8s,底层都是拿go语言实现的。所以java有它的优势,go也有它的优势,没有谁比谁更好这一说,只有合适。
那为什么说go语言天生支持高并发呢?因为go语言中引入了协程的概念,也就是goroutine,协程是比线程更小的执行单元。我们可以理解进程是计算机资源分配的最小单元,也是CPU分配资源的最小单位,具有独立的内存。而线程是计算机调度的最小单元,也是程序执行的最小单元,一个进程中会有多个线程。要想实现高并发,就是基于CPU的分时调度模型,CPU在不同时间点来回切换执行,那么java的多线程操作,也是基于此,CPU在不同的时间点来回切换,但是在同一时间点只能有一个线程工作,另外的线程处于阻塞状态,由于切换较快看上去似乎是同一时间。go协程比线程轻量,cpu的开销小。而且在 Go 语言中,有一个单独的调度器(Scheduler)负责调度和管理 Goroutine 的执行。调度器会自动将 Goroutine 分配给可用的 CPU 核心,并根据需要进行抢占式调度,以实现并发的执行。这使得编写并发程序变得更加简单,无需手动管理线程的创建和调度。因此go语言在底层就实现了高并发。
gin框架实现一个webSocket聊天功能的案例
定义一个连接websocket的启动函数
package service
import (
//"chat/conf"
"xxx/src/config/e"
"xxx/src/src/utils"
"encoding/json"
_ "fmt"
//"github.com/gin-gonic/gin/binding"
"github.com/gorilla/websocket"
"log"
)
func (manager *ClientManager) WebSocketStart() {
for {
log.Println("<---监听管道通信--->")
select {
case conn := <-Manager.Register: // 建立连接
log.Printf("建立新连接: %v", conn.conversation.Uid)
Manager.Clients[utils.Strval(conn.conversation.Uid)] = conn
replyMsg := &ReplyMsg{
Code: e.WebsocketSuccess,
Content: "已连接至服务器",
}
//这里调用记忆文件方法
msg, _ := json.Marshal(replyMsg)
_ = conn.Socket.WriteMessage(websocket.TextMessage, msg)
case conn := <-Manager.Unregister: // 断开连接
log.Printf("连接失败:%v", conn.conversation.Uid)
if _, ok := Manager.Clients[utils.Strval(conn.conversation.Uid)]; ok {
replyMsg := &ReplyMsg{
Code: e.WebsocketEnd,
Content: "连接已断开",
}
msg, _ := json.Marshal(replyMsg)
_ = conn.Socket.WriteMessage(websocket.TextMessage, msg)
close(conn.Send)
delete(Manager.Clients, utils.Strval(conn.conversation.Uid))
}
//广播信息
}
//
}
}
定义一个chat聊天服务的程序
package service
import (
"xxx/src/config"
"xxx/src/config/e"
"xxx/src/src/controllers"
"xxx/src/datasource"
"xxx/src/models"
"xxx/src/utils"
"github.com/gin-contrib/sessions"
"os"
"strconv"
"time"
//"chat/cache"
//"chat/conf"
"encoding/json"
"fmt"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"log"
"net/http"
//"strconv"
//"time"
)
//const month = 60 * 60 * 24 * 30 // 按照30天算一个月
// 发送消息的类型
type SendMsg struct {
Type int `json:"type"`
Content string `json:"content"`
}
// 回复的消息
/* 这个要注释掉 */
type ReplyMsg struct {
From string `json:"from"`
Code int `json:"code"`
Content string `json:"content"`
}
type HuaSoulReplyMsg struct {
Code int `json:"code"`
Status map[string]interface{} `json:"status"`
ISLIU string `json:"ISLIU"`
}
/*
构建Client表
*/
//Id|Uid|Did|title|descript
type Client struct {
conversation *models.Conversation
Socket *websocket.Conn
Send chan []byte
}
// 用户类
/*type Client struct {
ID string
SendID string
Socket *websocket.Conn
Send chan []byte
}*/
// 广播类,包括广播内容和源用户
/*type Broadcast struct {
Client *Client
Message []byte
Type int
}
*/
// 用户管理
type ClientManager struct {
Clients map[string]*Client
//Broadcast chan *Broadcast
Reply chan *Client
Register chan *Client
Unregister chan *Client
}
// Message 信息转JSON (包括:发送者、接收者、内容)
type Message struct {
Sender string `json:"sender,omitempty"`
Recipient string `json:"recipient,omitempty"`
Content string `json:"content,omitempty"`
}
var Manager = ClientManager{
Clients: make(map[string]*Client), // 参与连接的用户,出于性能的考虑,需要设置最大连接数
//Broadcast: make(chan *Broadcast),
Register: make(chan *Client),
Reply: make(chan *Client),
Unregister: make(chan *Client),
}
func WsHandler(ctx *gin.Context) {
/*
http协议升级为wobSocket协议
*/
conn, err := (&websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { // CheckOrigin解决跨域问题
return true
}}).Upgrade(ctx.Writer, ctx.Request, nil) // 升级成ws协议
if err != nil {
http.NotFound(ctx.Writer, ctx.Request)
return
}
/*
从session里获取用户id
*/
session := sessions.Default(ctx)
userId := session.Get("UserID")
if userId == nil {
// 处理session中未找到值的情况,返回错误信息
/*这里是双保险为了测试用*/
userId = ctx.Query("id")
if userId == nil {
ctx.JSON(http.StatusOK, gin.H{"code": 0, "message": "Session中未找到用户ID,用户未登录"})
return
}
}
user_Id, ok := userId.(string)
if !ok {
// 类型断言失败,处理类型不匹配的情况
ctx.JSON(http.StatusOK, gin.H{"code": 0, "message": "用户ID类型错误"})
return
}
//userId作为string类型的值进行后续操作
_, b := controllers.CacheCode.Get(user_Id)
uid, err := strconv.Atoi(user_Id)
if err != nil {
fmt.Println("用户id转换失败:", err)
return
}
//从session中获取用户id
/*
判断用户是否登录
*/
if b {
digital_Id := ctx.Query("DpId")
dp_Id, _ := strconv.Atoi(digital_Id)
//查询数据库,根据数字人Id查询数字人信息
digitalPerson, err := models.GetDigitalPersonByID(int64(dp_Id))
if err != nil {
ctx.JSON(http.StatusOK, gin.H{"code": 0, "message": "用户与数字人创建会话时,查询数字人失败"})
log.Println(err.Error())
return
}
fmt.Println(digitalPerson)
ctx.JSON(http.StatusOK, gin.H{"code": 1, "message": "用户与数字人创建会话时,查询数字人", "digitalPerson": digitalPerson})
// 根据用户id和数字人id,查询会话表中是否有会话记录
conversation, err := models.GetConversationsByUserIdAndDpId(int64(uid), int64(dp_Id))
if err != nil {
ctx.JSON(http.StatusOK, gin.H{"code": 0, "message": "查询会话记录失败!"})
return
}
var (
//conversation models.Conversation
ISULU_ID string
)
if conversation == nil {
// 如果查询不到记录,则插入一条新的会话记录
//创建调用AI接口实例
apiCode, ISULU_ID, err := HuaSoulNewAPI(int64(dp_Id))
if err != nil {
ctx.JSON(http.StatusOK, gin.H{"code": 0, "content": "数字人会话创建连接,失败!"})
}
if apiCode != 1 && ISULU_ID == "" {
ctx.JSON(http.StatusOK, gin.H{"code": 0, "content": "创建调用数字人会话接口实例,失败!"})
}
fmt.Println(ISULU_ID)
conversation = &models.Conversation{
Uid: int64(uid),
DpId: int64(dp_Id),
Title: "用户Id为" + user_Id + "的用户和数字人" + digitalPerson.Name + "开始会话",
IsliuId: ISULU_ID,
}
/*
往mysql数据库的会话表中插入一条记录
*/
err = models.InsertConversation(conversation)
if err != nil {
ctx.JSON(http.StatusOK, gin.H{"code": 0, "message": "创建会话后,向数据库插入数据失败!"})
return
}
} else {
ISULU_ID = conversation.IsliuId
//NewConversation = *conversation
// 如果查询到记录,调用查询意识流Id接口判断意识流id是否有效,若无效则修改意识流ID为新的意识流ID
flag := HuaSoulFindISLIUID(conversation.IsliuId)
fmt.Println("查询意识流Id, flag: ", flag)
//没找到
if !flag {
//创建调用AI接口实例
apiCode, ISULU_ID, err := HuaSoulNewAPI(int64(dp_Id))
if err != nil {
ctx.JSON(http.StatusOK, gin.H{"code": 0, "content": "数字人会话创建连接,失败!"})
}
if apiCode != 1 && ISULU_ID == "" {
ctx.JSON(http.StatusOK, gin.H{"code": 0, "content": "创建调用数字人会话接口实例,失败!"})
}
// 把调用AI接口获取的意识流ID赋给conversation.IsliuId
conversation.IsliuId = ISULU_ID
err = models.UpdateConversation(conversation)
if err != nil {
ctx.JSON(http.StatusOK, gin.H{"code": 0, "message": "更新会话意识流id失败!"})
return
}
}
}
// 创建一个用户客户端会话实例
newClient := &Client{
conversation: conversation,
Socket: conn,
Send: make(chan []byte),
}
// 用户会话注册到用户管理上
Manager.Register <- newClient
// ---------------------------------------- //
url := config.Conf.SoulUrl + "/soul/neural"
fileName := "./neural/" + ISULU_ID + ".CFN"
// 获取文件信息
_, err = os.Stat(fileName)
if err == nil {
fmt.Println("文件存在")
// 构造请求参数
request := SoulNeuralRequest{
Auth: "1d527cbf2cd50ad7e97f0aa23e2a712e",
Type: 1, // 1表示上载
ISLIUid: ISULU_ID,
}
// 调用函数
_, err := SoulNeuralFileUpload(url, request, fileName)
if err != nil {
log.Printf("Error during soul neural transfer: %v", err)
}
// 处理响应
//fmt.Println("Response received: ", response)
} else if os.IsNotExist(err) {
fmt.Println("文件不存在")
} else {
fmt.Println("获取文件信息出错:", err)
}
// ---------------------------------------- //
//fmt.Println("调用socket的Read方法前:", ISULU_ID)
go newClient.Read()
go newClient.Write()
} else {
ctx.JSON(http.StatusOK, gin.H{"code": 0, "message": "用户未登录"})
return
}
}
type ErrorMessage struct {
Code int
Message string
}
/*
从websocket读取客户端用户的消息,然后服务器回应前端一个消息
*/
func (c *Client) Read() {
defer func() { // 避免忘记关闭,所以要加上close
Manager.Unregister <- c
_ = c.Socket.Close()
url := config.Conf.SoulUrl + "/soul/neural"
fileName := "./neural/" + c.conversation.IsliuId + ".CFN"
// 构造请求参数
request := SoulNeuralRequest{
Auth: "1d527cbf2cd50ad7e97f0aa23e2xxx",
Type: 0, // 0表示下载
ISLIUid: c.conversation.IsliuId,
}
// 调用函数
err1 := SoulNeuralFileDownload(url, request, fileName)
if err1 != nil {
log.Printf("Error during soul neural transfer: %v", err1)
}
}()
for {
c.Socket.PongHandler()
sendMsg := new(SendMsg)
err := c.Socket.ReadJSON(&sendMsg)
if err != nil {
log.Println("数据格式不正确", err)
Manager.Unregister <- c
_ = c.Socket.Close()
// 向前端返回数据格式不正确的状态码和消息
errorMsg := ErrorMessage{
Code: -1,
Message: "数据格式不正确",
}
errmsg, _ := json.Marshal(errorMsg)
_ = c.Socket.WriteMessage(websocket.TextMessage, errmsg)
break
}
// 调用AI的会话接口
resCode, status, ISLIU, chat_err := c.HuaSoulEchoAPI(sendMsg)
if chat_err != nil {
log.Println("会话失败")
// 向前端返回数据格式不正确的状态码和消息
errorMsg := ErrorMessage{
Code: -1,
Message: "会话失败",
}
errmsg, _ := json.Marshal(errorMsg)
_ = c.Socket.WriteMessage(websocket.TextMessage, errmsg)
return
}
// 读取前端发送过来的消息并打印
fmt.Println(sendMsg)
/*
响应类型0 交互
*/
// 拼接转换会话Id
conversation_Id := utils.Strval(c.conversation.Uid) + "_" + utils.Strval(c.conversation.DpId)
if sendMsg.Type == 0 {
/*
收到前端发送过来的消息,保存至数据库
*/
senderType := "user"
chatDB_err := saveChatRecordMongoDB(conversation_Id, c.conversation.Uid, sendMsg.Content, senderType)
// 错误处理
if chatDB_err != nil {
log.Println("保存用户聊天记录至数据库出错!")
errorMsg := ErrorMessage{
Code: -1,
Message: "保存用户聊天记录至数据库出错!",
}
errmsg, _ := json.Marshal(errorMsg)
_ = c.Socket.WriteMessage(websocket.TextMessage, errmsg)
return
}
/*
调用AI模型接口回复前端用户的消息
*/
HSReplyMsg := HuaSoulReplyMsg{
Code: resCode,
Status: status,
ISLIU: ISLIU,
}
msg, _ := json.Marshal(HSReplyMsg)
_ = c.Socket.WriteMessage(websocket.TextMessage, msg)
/*
mongoDB保存数字人发送给前端用户的聊天记录
*/
senderType2 := "bot"
chatDB_err2 := saveChatRecordMongoDB(conversation_Id, c.conversation.DpId, ISLIU, senderType2)
// 错误处理
if chatDB_err2 != nil {
log.Println("保存数字人聊天记录至数据库出错!")
errorMsg := ErrorMessage{
Code: -1,
Message: "保存数字人聊天记录至数据库出错!",
}
errmsg, _ := json.Marshal(errorMsg)
_ = c.Socket.WriteMessage(websocket.TextMessage, errmsg)
return
}
/*
发送消息成功,resCode为1
*/
if resCode == 1 {
//增加数字人的交互量
err := models.IncrementDigitalPersonInteractionCount(c.conversation.DpId)
if err != nil {
log.Fatal("更新交互量失败:", err)
}
fmt.Println("交互量更新成功!")
}
}
/*
type为1,表示用户无回应,数字人进行主动发送信息,进行主动关怀!!
*/
if sendMsg.Type == 1 {
resCode, status, ISLIU, chat_err := c.HuaSoulEchoAPI(sendMsg)
if chat_err != nil {
log.Println("会话失败")
// 向前端返回数据格式不正确的状态码和消息
errorMsg := ErrorMessage{
Code: -1,
Message: "会话失败",
}
errmsg, _ := json.Marshal(errorMsg)
_ = c.Socket.WriteMessage(websocket.TextMessage, errmsg)
return
}
HSReplyMsg := HuaSoulReplyMsg{
Code: resCode,
Status: status,
ISLIU: ISLIU,
}
//解析AI接口返回的数据
msg, _ := json.Marshal(HSReplyMsg)
/*
mongoDB保存数字人发送给前端用户的聊天记录
*/
senderType := "bot"
chatDB_err := saveChatRecordMongoDB(conversation_Id, c.conversation.DpId, ISLIU, senderType)
// 错误处理
if chatDB_err != nil {
log.Println("保存数字人聊天记录至数据库出错!")
errorMsg := ErrorMessage{
Code: -1,
Message: "保存数字人聊天记录至数据库出错!",
}
errmsg, _ := json.Marshal(errorMsg)
_ = c.Socket.WriteMessage(websocket.TextMessage, errmsg)
return
}
// 回复数据至前端用户
_ = c.Socket.WriteMessage(websocket.TextMessage, msg)
/*
发送消息成功,resCode为1
*/
if resCode == 1 {
//增加数字人的交互量
err := models.IncrementDigitalPersonInteractionCount(c.conversation.DpId)
if err != nil {
log.Fatal("更新交互量失败:", err)
}
fmt.Println("交互量更新成功!")
}
}
}
}
func (c *Client) Write() {
defer func() {
_ = c.Socket.Close()
}()
for {
select {
case message, ok := <-c.Send:
if !ok {
_ = c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
return
}
log.Println(c.conversation.Uid, "接受消息:", string(message))
replyMsg := ReplyMsg{
Code: e.WebsocketSuccessMessage,
Content: fmt.Sprintf("%s", string(message)),
}
msg, _ := json.Marshal(replyMsg)
_ = c.Socket.WriteMessage(websocket.TextMessage, msg)
}
}
}
//保存聊天会话到MongoDB数据库
func saveChatRecordMongoDB(conversationId string, senderId int64, message string, senderType string) error {
chatRecord, err := datasource.GetChatRecordByConversationID(conversationId)
if err != nil {
log.Println("保存聊天会话到MongoDB数据库,err: ", err)
return err
}
//timeString := utils.GetNormalTimeString(time.Now())
// 如果数据库中,聊天记录表没有这个聊天记录则插入一条
if chatRecord == nil {
chatRecordArray := datasource.ChatRecordArray{
ConversationId: conversationId,
Records: []datasource.ChatRecord{
{SenderId: senderId, Timestamp: time.Now().Unix(), Message: message, SenderType: senderType},
},
}
err = datasource.SaveChatRecordArray(chatRecordArray)
if err != nil {
log.Println("MongoDB数据库插入一条数据,err: ", err)
return err
}
fmt.Println("ChatRecordArray first saved successfully")
} else {
// 如果数据库中已经存在聊天记录,则更新记录
chatRecords := []datasource.ChatRecord{
{SenderId: senderId, Timestamp: time.Now().Unix(), Message: message, SenderType: senderType},
}
err := datasource.AddChatRecordToConversation(conversationId, chatRecords)
if err != nil {
log.Println("MongoDB数据库更新一条数据,err: ", err)
return err
}
fmt.Println("ChatRecordArray saved successfully")
}
return nil
}
main方法中启动连接websocket服务
- 启动聊天协程监听连接
- go service.Manager.WebSocketStart()
package main
import (
"xxx/src/config"
"xxx/src/datasource"
"xxx/src/routers"
"xxx/src/service"
"fmt"
"time"
)
func main() {
config.InitConfig()
datasource.InitMysql()
// 同步结构体与数据库表
/* err := datasource.Engine.Sync2(new(models.Conversation))
if err != nil {
log.Fatal("创建表失败:", err)
}*/
//println(utils.GetNormalTimeString(time.Now()))
//fmt.Println(time.Now().Unix())
// 将时间戳转换为时间对象
tm := time.Unix(time.Now().Unix(), 0)
// 将时间对象格式化为日期字符串
dateString := tm.Format("2006-01-02 15:04:05") // 格式化模板根据需求进行调整,这里使用"2006-01-02"表示年月日
fmt.Println("dateString: ", dateString)
//启动聊天协程监听连接
go service.Manager.WebSocketStart()
router := routers.InitRouter()
/*
// 中间件对应的包:github.com/unrolled/secure
router.Use(https_auth.HttpsHandler()) //https对应的中间件
path := "src/https_auth/CA/" //证书的路径
router.RunTLS(":8080", path+"admin.mindepoch.com.pem", path+"admin.mindepoch.com.key") //开启HTTPS服务
*/
/* println("education innovation revolution")
println("教育 创新 变革")
println("test ....")
*/
//初始化redis配置
//utils.InitRedisStore()
// Run("里面不指定端口号默认为8080")
router.Run(":8089")
}
更多推荐
所有评论(0)