前言

在我的前一篇文章里

vue+springboot实现聊天功能

🎈🎈🎈🎈🎈🎈🎈

实现了最最基础的聊天功能,可以通过聊天互相给对方发送信息

🎈🎈🎈🎈🎈🎈🎈

那么上次的文章是通过定时任务定时的刷新聊天数据,实现监听聊天记录的,那么这里来做点不一样的东西

🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸
接下来我会通过websocket,通过教学实例,教大家如何实现如下功能点

🌰 实时获取聊天数据
🌰 实时更新在线人员
🌰 实时更新未读消息数量

websocket

什么是websocket

WebSocket是一种在单个TCP连接上进行全双工通信的协议。它允许在客户端和服务器之间建立持久连接,从而实现实时的双向通信。相比传统的HTTP请求-响应模式,WebSocket提供了更高效、更实时的通信方式。

简单的说,就是通过服务端主动向客户端发送信息,也就是后端主动给前端发信息

websocket的优点

实时性: WebSocket允许服务器主动向客户端推送数据,而不需要客户端首先发送请求。这使得实时通信变得更加容易实现,例如在线聊天、实时游戏等。

减少延迟: 由于WebSocket建立了持久连接,避免了HTTP的每次请求-响应的开销,因此可以减少通信的延迟,使得应用程序的响应速度更快。

减少网络流量: WebSocket的头部信息相比于HTTP较小,且不需要在每次通信时重复发送,因此可以减少网络流量。

更少的资源消耗: WebSocket的连接一旦建立就可以持续存在,不需要像HTTP一样频繁地建立和关闭连接,这可以减少服务器和客户端的资源消耗。

更好的跨域支持: WebSocket协议支持跨域通信,使得在不同域名下的服务器和客户端之间进行实时通信更加方便。

🍒🍒🍒🍒🍒🍒🍒🍒🍒🍒

项目演示

项目展示图

在这里插入图片描述

同样在讲解如何实现之前,和大家演示一下我项目中实际的效果是什么样

在用户区,默认会抓取注册用户特定数量的用户数据,按照在线状态,是否和自己聊天进行展示顺序,如果有查人员会过滤只查特定用户名的数据,否则按照默认抓取

在这里插入图片描述
当输入用户名:
在这里插入图片描述

当有用户登录或者登出的时候更新人员在线状态

初始时候:
在这里插入图片描述
假如用户11111进行登出:
在这里插入图片描述
在这里插入图片描述
可以看到,该用户在线状态为关闭

然后再进行登录11111
在这里插入图片描述
在这里插入图片描述

假如我对用户进行发送聊天信息,在未打开聊天框的情况下:
在这里插入图片描述
发送信息之后:
在这里插入图片描述
会立刻同步信息

此时打开聊天框之后:
已读数量重置:
在这里插入图片描述

📌📌📌📌📌📌📌📌📌📌

以上为我项目中实现上述功能的演示,接下来我将讲解如何实现

前端

那么正式开始之前,我的表同前面的文章所写,就两张表,一张用户表,一张记录信息表

websocket.js

这里写一个获取websocket的工具类

// WebSocket地址
const url = 'ws://127.0.0.1:8082/sso/webSocket/'

// WebSocket实例
let ws

// 重连定时器实例
let reconnectTimer

// WebSocket重连开关
let isReconnecting = false

// WebSocket对象
const websocket = {
  // WebSocket建立连接
  Init (username) {
    // 判断浏览器是否支持WebSocket
    if (!('WebSocket' in window)) {
      console.log('浏览器不支持WebSocket')
      return
    }

    // 创建WebSocket实例
    ws = new WebSocket(url + username)

    // 监听WebSocket连接
    ws.onopen = () => {
      console.log('WebSocket连接成功')
    }

    // 监听WebSocket连接错误信息
    ws.onerror = (e) => {
      console.log('WebSocket重连开关', isReconnecting)
      console.log('WebSocket数据传输发生错误', e)
      // 打开重连
      reconnect()
    }

    // 监听WebSocket接收消息
    ws.onmessage = (e) => {
      console.log('WebSocket接收后端消息:', e)
      // 心跳消息不做处理
      if (e.data === 'ok') {
        return
      }

      // 调用回调函数处理接收到的消息
      if (websocket.onMessageCallback) {
        websocket.onMessageCallback(e.data)
      }
    }
  },

  // WebSocket连接关闭方法
  Close () {
    // 关闭断开重连机制
    isReconnecting = true
    ws.close()
    // ElMessage.error('WebSocket断开连接')
  },

  // WebSocket发送信息方法
  Send (data) {
    // 处理发送数据JSON字符串
    const msg = JSON.stringify(data)
    // 发送消息给后端
    ws.send(msg)
  },

  // 暴露WebSocket实例,其他地方调用就调用这个
  getWebSocket () {
    return ws
  },

  // 新增回调函数用于处理收到的消息
  onMessageCallback: null,

  // 设置消息处理回调函数
  setMessageCallback (callback) {
    this.onMessageCallback = callback
  }
}

// 监听窗口关闭事件,当窗口关闭时-每一个页面关闭都会触发-扩张需求业务
window.onbeforeunload = function () {
  // 在窗口关闭时关闭 WebSocket 连接
  websocket.Close()
  console.log('WebSocket窗口关闭事件触发')
}

// 浏览器刷新重新连接
// 刷新页面后需要重连-并且是在登录之后
if (performance.getEntriesByType('navigation')[0].type === 'reload') {
  console.log('WebSocket浏览器刷新了')

  // 延迟一定时间再执行 WebSocket 初始化,确保页面完全加载后再进行连接
  setTimeout(() => {
    console.log('WebSocket执行刷新后重连...')
    // 刷新后重连
    // 获取登录用户id
    let userId = ''
    websocket.Init(userId)
  }, 200) // 适当调整延迟时间
}

// 重连方法
function reconnect () {
  // 判断是否主动关闭连接
  if (isReconnecting) {
    return
  }
  // 重连定时器-每次WebSocket错误方法onerror触发它都会触发
  reconnectTimer && clearTimeout(reconnectTimer)
  reconnectTimer = setTimeout(function () {
    console.log('WebSocket执行断线重连...')
    // 获取登录用户id
    let userId = ''
    websocket.Init(userId)
    isReconnecting = false
  }, 4000)
}

// 暴露对象
export default websocket

webSocketApi

  • 这个文件单纯的连接后端ajax的
import request from '@/utils/request'

// 发送信息
export function sendMessageTo (data) {
  return request({
    url: '/webSocket/sendMessageTo',
    method: 'post',
    data: data
  })
}

// 获取在线人数
export function getOnLineUser () {
  return request({
    url: '/webSocket/getOnLineUser',
    method: 'get',
    params: {}
  })
}

// 发送信息给所有人
export function sendMessageAll (message) {
  return request({
    url: '/webSocket/sendMessageAll',
    method: 'get',
    params: {
      message
    }
  })
}

界面

<template>
  <div class="chat-container">
    <!-- Left side: User list -->
    <div class="left-side">
      <!-- Search input (moved outside) -->
      <div class="search-wrapper">
        <el-input v-model="searchUserName" placeholder="回车搜索用户" class="search-input" @keydown.enter.native="searchUserForForm"></el-input>
      </div>
      <!-- User list (with scroll) -->
      <el-scrollbar class="user-list-scroll">
        <el-row>
          <el-col :span="24" v-for="form in messageForm" :key="form.sendUser.userId" @click.native="chooseUser(form.sendUser)" class="user-item" v-if="messageForm.length !== 0">
            <div class="user-avatar-wrapper">
              <div :class="{ 'online-dot': form.isOnline }"></div>
              <!-- Element UI Badge for showing unread messages -->
              <el-badge :value="form.noReadMessageLength" class="message-badge" v-if="form.noReadMessageLength > 0">
                <img :src="form.sendUser.avatar" class="user-avatar">
              </el-badge>
              <img :src="form.sendUser.avatar" class="user-avatar" v-else>
            </div>
            <div class="user-details">
              <div class="user-name">{{ form.sendUser.userName }}</div>
              <div class="user-last-message">{{ form.lastMessage }}&nbsp;</div>
            </div>
          </el-col>
        </el-row>
      </el-scrollbar>
    </div>
    <!-- Right side: Chat box -->
    <div class="right-side">
      <!-- Chat header -->
      <div class="chat-header">
        <span v-if="currentUser">{{ currentUser.userName }}</span>
      </div>
      <!-- Chat messages -->
      <el-scrollbar class="chat-messages" ref="messageContainer">
        <div class="messageBox" v-for="message in messages" :key="message.handle" :class="{ ownMessage: message.sendUser === loginUser.userId, otherMessage: message.sendUser !== loginUser.userId }">
          <div><img :src="message.sendUser === loginUser.userId ? loginUser.avatar : currentUser.avatar" alt="" style="border: 1px solid #70c1fa;"></div>
          <div class="messageContent">{{ message.content }}</div>
          <div class="messageTime">{{ message.createTime.replace('T', ' ') }}</div>
        </div>
      </el-scrollbar>
      <div class="chat-input">
        <el-input v-model="newMessage.content" placeholder="请输入聊天内容" autosize class="message-input" @keydown.enter.native="sendMessage"></el-input>
        <el-button type="primary" @click.native="sendMessage" class="send-button">发送</el-button>
      </div>
    </div>
  </div>
</template>

逻辑

<script>
// 获取用户信息和聊天信息后端接口,根据自己的实际项目修改
import {findMessageBySendUserAndReceiveUser, searchUserForForm} from '../api/message'
// 发送信息给指定userId的websocket
import {sendMessageTo} from '../api/webSocketApi'
// 根据userId获取用户信息
import {searchUserByUserId} from '../api/user'
// 刚刚写的websocket
import websocket from '../utils/webSocket'

export default {
  data () {
    return {
      currentUser: null, // 当前聊天的人
      loginUser: null,
      messages: [],
      messageForm: [], // 聊天所有信息
      newMessage: {
        handle: '',
        sendUser: '',
        receiveUser: '',
        content: '',
        is_read: '0',
        createTime: ''
      },
      searchUserName: ''
    }
  },
  methods: {
    async fetchMessages (userId) {
      if (!userId) {
        this.searchUserForForm()
        return
      }
      if (this.loginUser.userId == null) {
        this.$message.error('登录用户编号获取失败,请重新登录!')
        return
      }
      findMessageBySendUserAndReceiveUser(userId, this.loginUser.userId ).then(async res => {
        await this.searchUserForForm()
        this.messages = res.value
        // 将聊天记录总下拉到最下方
        this.$nextTick(() => {
          this.scrollToBottom()
        })
      })
    },
    sendMessage () {
      if (!this.newMessage.content.trim()) {
        this.$message.warning('请输入聊天内容')
        return
      }
      this.newMessage.content = this.newMessage.content.trim()
      if (this.loginUser.userId == null) {
        this.$message.error('登录用户编号获取失败,请重新登录!')
        return
      }
      if (this.loginUser.userId  === this.currentUser.userId) {
        this.$message.error('不能给自己发送信息!')
        return
      }
      this.newMessage.sendUser = this.loginUser.userId 
      this.newMessage.receiveUser = this.currentUser.userId
      this.sendWebSocketMessage(this.newMessage)
      sendMessageTo(this.newMessage).then(res => {
        if (res.header.code !== 0) {
          this.$message.error(res.header.message)
          return
        }
        this.chooseUser(this.currentUser)
      })
    },
    // 消息过多的时候滚动到最新消息位置
    scrollToBottom () {
      // 使用 $refs 来获取对消息容器的引用
      const container = this.$refs.messageContainer.$refs.wrap
      // 滚动到底部
      container.scrollTop = container.scrollHeight
    },
    checkAvatar (avatar) {
      if (avatar && avatar !== undefined) {
        return avatar
      }
      return ''
    },
    chooseUser (user) {
      this.currentUser = user
      this.fetchMessages(user.userId)
    },
    // websocket
    async connectWebSocket (userId) {
      await new Promise((resolve) => {
        websocket.Init(userId)

        // 在 WebSocket 连接成功时调用 resolve
        websocket.getWebSocket().onopen = () => {
          console.log('WebSocket连接成功')
          resolve()
        }
      })
    },
    // 发送信息
    sendWebSocketMessage (message) {
      websocket.getWebSocket().onmessage = (event) => {
        // 处理消息,这里你可以根据实际需求更新页面上的数据
        console.log('收到的消息WebSocket2:', event)
        // 更新收到的消息
        // receivedMessage.value = event.data
        if (this.currentUser) {
          this.fetchMessages(this.currentUser.userId)
        } else {
          this.fetchMessages()
        }
      }
    },
    handleMessage (message) {
      // 内容进行相应的处理
      const parsedMessage = JSON.parse(message)
      console.log('收到信息:', parsedMessage)
      if (this.currentUser) {
        this.fetchMessages(this.currentUser.userId)
      } else {
        this.fetchMessages()
      }
    },
    // 获取所有信息
    searchUserForForm () {
      if (this.loginUser !== null) {
        searchUserForForm(this.loginUser.userId, this.searchUserName).then(res => {
          if (res.header.code !== 0) {
            this.$message.error(res.header.message)
            return
          }
          this.messageForm = res.value
        })
      }
    },
    // 初始化websocket
     async connectWebSocket (userId) {
      await new Promise((resolve) => {
        websocket.Init(userId)

        // 在 WebSocket 连接成功时调用 resolve
        websocket.getWebSocket().onopen = () => {
          console.log('WebSocket连接成功')
          resolve()
        }
      })
    },
  },
  mounted () {
    websocket.setMessageCallback((res) => {
      this.handleMessage(res)
    })
  },
  created () {
  },
  beforeCreate () {
  // 获取登录用户userId,请根据自己实际项目获取
  let userId = ''
  this.connectWebSocket(userId)
    if (userId) {
      searchUserByUserId(userId).then(res => {
        if (res.header.code === 0) {
          if (res.value) {
            this.loginUser = res.value
          } else {
            this.loginUser.userId = userId
          }
        }
      }).then(() => {
        this.searchUserForForm()
      })
    }
  }
}
</script>

样式


<style scoped>
.chat-container {
  display: flex;
  height: 100%;
  background: linear-gradient(to bottom right, #FFFFFF, #ECEFF1);
}

.left-side {
  position: relative; /* Position relative for absolute positioning */
  flex: 1;
  padding: 20px;
  border-right: 1px solid #eaeaea;
  border-radius: 10px;
  box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
}

.search-input {
  position: absolute;
  top: 20px;
  left: 20px;
  width: calc(100% - 40px);
  max-width: 300px;
}

.user-list-scroll {
  top: 40px;
  height: calc(100% - 40px);
  overflow-y: auto;
}

.user-avatar-wrapper {
  position: relative;
  display: inline-block;
}

.user-avatar {
  width: 40px;
  height: 40px;
  border-radius: 50%;
  margin-right: 10px;
  border: 1px solid #74ffd2;
}

.user-name {
  font-weight: 800;
  white-space: nowrap; /* 不换行 */
  overflow: hidden; /* 溢出隐藏 */
  padding-left: 15px;
  text-overflow: ellipsis; /* 超出显示省略号 */
  text-align: left; /* 添加左对齐属性 */
}

.user-last-message {
  color: #a19f9f;
  font-size: 14px;
  white-space: nowrap;
  overflow: hidden;
  padding-left: 15px;
  text-overflow: ellipsis;
  text-align: left; /* 添加左对齐属性 */
}

.right-side {
  flex: 3;
  display: flex;
  flex-direction: column;
}

.chat-header {
  padding: 20px;
  border-bottom: 1px solid #eaeaea;
  font-size: 1.2em;
  color: #37474F;
}

.chat-messages {
  flex: 1;
  overflow-y: auto;
  padding: 20px;
}

.chat-input {
  padding: 20px;
  display: flex;
  align-items: center;
}

.message-input {
  flex: 1;
  margin-right: 10px;
}

.send-button {
  flex-shrink: 0;
}

.user-item {
  display: flex;
  align-items: center;
  padding: 10px;
  border-bottom: 1px solid #f0f0f2;
}

.user-item:hover {
  background-color: #E0E0E0;
  cursor: pointer;
  transition: background-color 0.3s ease;
}

.user-details {
  flex-grow: 1; /* 填充剩余空间 */
}

.messageBox {
  display: flex;
  align-items: flex-start; /* 将头像和文本第一行对齐 */
  margin-bottom: 10px;
}

.messageBox img {
  width: 40px; /* 调整头像大小 */
  height: 40px;
  border-radius: 50%;
  margin-right: 10px;
  margin-left: 10px;
}

.messageContent {
  max-width: 70%; /* 调整发送信息宽度 */
  padding: 10px;
  border-radius: 8px;
  background-color: #f0f0f0;
  text-align: left; /* 文本左对齐 */
  word-wrap: break-word; /* 当文本过长时自动换行 */
}

.messageTime {
  font-size: 12px;
  color: #999;
  margin-left: 10px;
  margin-top: 5px; /* 将发送时间与文本分隔开 */
}

.ownMessage {
  flex-direction: row-reverse;
  align-items: flex-end; /* 将发送时间放置在最下方的贴右位置 */
}

.otherMessage {
  flex-direction: row;
  align-items: flex-end; /* 将发送时间放置在最下方的贴左位置 */
}

.online-dot {
  position: absolute;
  top: 0;
  left: 0;
  z-index: 1;
  width: 10px;
  height: 10px;
  background-color: #01c201;
  border-radius: 50%;
}
.message-badge .el-badge__content {
  position: absolute;
  bottom: 5px; /* Adjust to your desired position */
  left: 5px; /* Adjust to your desired position */
  background-color: #f56c6c; /* Red background for visibility */
  color: white; /* White text color */
}
</style>

✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨

以上主逻辑就OK了 ,其他的如axios调用后端接口的逻辑,等等省略

后端

首先我们现安装依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    <version>2.7.0</version> <!-- 可以根据你的Spring Boot版本调整这个版本号 -->
</dependency>

如上你的为springboot项目的话是可以省略版本的,自动查找对应版本

实体

对应我的用户实体为:

@Data
public class User {

  private String userId;
  private String avatar;
  private String userName;
  private String password;
  private String salt;
  private String email;
  private String phone;
  private String sex;
  private Integer age;
  private Integer status;
  @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
  private LocalDateTime createTime;
  @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
  private LocalDateTime updateTime;
}

消息实体:

@Getter
@Setter
@ToString

public class Message {

  private String handle;

  private String sendUser;

  private String receiveUser;

  private String content;

  private String isRead;

  private LocalDateTime createTime;
}

websocket存储实体:


import javax.websocket.Session;
import lombok.Data;

@Data
public class WebSocket {

  private Session session;
  private String userId;
}

对应为了实现该功能而搭建的实体

import com.pearl.entitys.dataBaseTable.User;
import java.util.ArrayList;
import java.util.List;
import lombok.Data;

@Data
public class MessageForm {

  // 发送用户和接收用户完整聊天消息列表
  private List<Message> messages = new ArrayList<>();
  // 未读消息数量
  private Integer noReadMessageLength;
  // 在线标识
  private Boolean isOnline;
  // 发送信息用户
  private User sendUser;
  // 接收信息用户,偏向于赋值登录人员用户信息
  private User receiveUser;
  // 最新一条聊天记录
  private String lastMessage;
}

websocket类

import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSONObject;
import com.pearl.entitys.beans.webSocket.WebSocket;
import com.pearl.entitys.dataBaseTable.Message;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

/**
 * @program: tools
 * @Description: 通过这个类进行连接WebSocket的,默认发信息就进入onMessage解析
 */
@Component
@ServerEndpoint(value = "/webSocket/{userId}")
@Slf4j
public class WebSocketUtil {

  /**
   * 登录连接数 应该也是线程安全的
   */
  private static int loginCount = 0;
  /**
   * user 线程安全的
   */
  private static final Map<String, WebSocket> userMap = new ConcurrentHashMap<String, WebSocket>();

  /**
   * @Description: 收到消息触发事件,这个消息是连接人发送的消息
   * @Param [messageInfo, session]
   * @Return: void { "userId": "test2", "message": "你收到了嘛?这是用户test发的消息!" }
   **/
  @OnMessage
  public void onMessage(String messageInfo, Session session)
      throws IOException, InterruptedException {
    if (StringUtils.isBlank(messageInfo)) {
      return;
    }
    // 当前用户
    String userIdTo = session.getPathParameters().get("userId");
    // JSON数据
    log.info("onMessage:{}", messageInfo);
    System.out.println("接收信息:" + messageInfo + "," + userIdTo);
//    Map map = JSON.parseObject(messageInfo, Map.class);
//    // 接收人
//    String userId = (String) map.get("userId");
//    // 消息内容
//    String message = (String) map.get("message");
    // 发送给指定用户
    sendMessageTo(messageInfo, userIdTo);
    log.info(DateUtil.now() + " | " + userIdTo + " 私人消息-> " + messageInfo, userIdTo);
  }

  /**
   * @Description: 打开连接触发事件
   * @Param [account, session, config]
   * @Return: void
   **/
  @OnOpen
  public void onOpen(@PathParam("userId") String userId, Session session) throws IOException {
    WebSocket webSocket = new WebSocket();
    webSocket.setUserId(userId);
    webSocket.setSession(session);
    boolean containsKey = userMap.containsKey(userId);
    if (!containsKey) {
      // 添加登录用户数量
      addLoginCount();
      userMap.put(userId, webSocket);
    }
    log.info("打开连接触发事件!已连接用户: " + userId);
    log.info("当前在线人数: " + loginCount);
    Message message = new Message();
    message.setContent(userId + " 已连接");
    sendMessageAll(JSONObject.toJSONString(message));
  }

  /**
   * @Description: 关闭连接触发事件
   * @Param [session, closeReason]
   * @Return: void
   **/
  @OnClose
  public void onClose(@PathParam("userId") String userId, Session session,
      CloseReason closeReason) throws IOException {
    boolean containsKey = userMap.containsKey(userId);
    if (containsKey) {
      // 删除map中用户
      userMap.remove(userId);
      // 减少断开连接的用户
      reduceLoginCount();
    }
    log.info("关闭连接触发事件!已断开用户: " + userId);
    log.info("当前在线人数: " + loginCount);
    Message message = new Message();
    message.setContent(userId + " 已断开");
    sendMessageAll(JSONObject.toJSONString(message));
  }

  /**
   * @Description: 传输消息错误触发事件
   * @Param [error :错误]
   * @Return: void
   **/
  @OnError
  public void onError(Throwable error) {
    log.info("onError:{}", error.getMessage());
  }

  /**
   * @Description: 发送指定用户信息
   * @Param [message:信息, userId:用户]
   * @Return: void
   **/
  public void sendMessageTo(String message, String userId) throws IOException {
    for (WebSocket user : userMap.values()) {
      if (user.getUserId().equals(userId)) {
        System.out.println("用户:" + userId + "收到信息:" + message);
        user.getSession().getAsyncRemote().sendText(message);
      }
    }
  }

  /**
   * @Description: 发给所有人
   * @Param [message:信息]
   * @Return: void
   **/
  public void sendMessageAll(String message) throws IOException {
    for (WebSocket item : userMap.values()) {
      item.getSession().getAsyncRemote().sendText(message);
    }
  }

  /**
   * @Description: 连接登录数增加
   * @Param []
   * @Return: void
   **/
  public static synchronized void addLoginCount() {
    loginCount++;
  }

  /**
   * @Description: 连接登录数减少
   * @Param []
   * @Return: void
   **/
  public static synchronized void reduceLoginCount() {
    loginCount--;
  }

  /**
   * @Description: 获取用户
   * @Param []
   * @Return: java.util.Map<java.lang.String, com.cn.webSocket.WebSocket>
   **/
  public synchronized Map<String, WebSocket> getUsers() {
    return userMap;
  }

}

消息controller

import com.pearl.responseEntity.Response;
import com.pearl.service.MessageService;
import com.pearl.utils.db.PrimeDB;
import java.sql.Connection;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/message")
public class MessageController {
// 我的数据库
  @Autowired
  private PrimeDB primeDB;
//逻辑层
  @Resource
  private MessageService messageService;

// 查找未读数量
  @GetMapping("/findNoReadMessageLength")
  public Response findNoReadMessage(@RequestParam("userId") String userId) {
    try (Connection conn = primeDB.create()) {
      Integer total = messageService.findNoReadMessageLength(conn, userId);
      return new Response(0, total, "查询成功!");
    } catch (Exception e) {
      return new Response(1, e.getMessage());
    }
  }

// 查找两个人的聊天记录
  @GetMapping("/findMessageBySendUserAndReceiveUser")
  public Response<List<Message>> findMessageBySendUserAndReceiveUser(
      @RequestParam("sendUserId") String sendUserId,
      @RequestParam("receiveUserId") String receiveUserId) {
    try (Connection conn = primeDB.create()) {
      return new Response<>(0,
          messageService.findMessageBySendUserAndReceiveUser(conn, sendUserId, receiveUserId),
          "查找成功!");
    } catch (Exception e) {
      return new Response<>(1, e.getMessage());
    }
  }
  // 发送信息
  @PostMapping("/sendMessage")
  public Response sendMessage(@RequestBody Message message) {
    try (Connection conn = primeDB.create()) {
      messageService.sendMessage(conn, message);
      return new Response(0, "发送成功!");
    } catch (Exception e) {
      return new Response(1, e.getMessage());
    }
  }

// 查找我的用户信息数据
  @GetMapping("searchUserForForm")
  public Response<List<MessageForm>> searchUserForForm(
      @RequestParam("loginUserId") String loginUserId,
      @RequestParam("searchUserName") String searchUserName) {
    try (Connection conn = primeDB.create()) {
      List<MessageForm> messages = messageService.searchUserForForm(conn,
          loginUserId, searchUserName);
      return new Response<>(0, messages, "查找成功!");
    } catch (Exception e) {
      return new Response<>(1, e.getMessage());
    }
  }
}

消息逻辑

import com.pearl.db.MessageDao;
import com.pearl.db.UserDao;
import com.pearl.entitys.beans.MessageForm;
import com.pearl.entitys.beans.webSocket.WebSocket;
import com.pearl.entitys.dataBaseTable.Message;
import com.pearl.entitys.dataBaseTable.User;
import com.pearl.utils.AssertUtils;
import com.pearl.utils.WebSocketUtil;
import com.pearl.utils.Where;
import java.sql.Connection;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MessageService {

  @Autowired
  private WebSocketUtil webSocketUtil;
  // 限制聊天记录数量
  final Integer limitMessagesLength = 6000;
  // 限制用户数量
  final Integer limitUserLength = 300;


  // 获取未读的接收信息
  public Integer findNoReadMessageLength(Connection conn, String userId) throws Exception {
    try {

      Integer totol = 0;
      AssertUtils.isError(StringUtils.isEmpty(userId), "用户编号不能为空!");
      UserDao userDao = new UserDao(conn);
      MessageDao messageDao = new MessageDao(conn);
      User user = userDao.selectbyUserId(userId);
      AssertUtils.isError(null == user, "用户编号:" + userId + "不存在!");
      // 为防止发送人特别多导致信息未获取,这里多设置一些拿信息数据
      List<Message> messages = messageDao
          .selectByReceiveUserLimitLength(userId, limitMessagesLength);
      Map<String, List<Message>> messageBySendUserMap = messages.stream()
          .collect(Collectors.groupingBy(Message::getSendUser));
      for (String sendUser : messageBySendUserMap.keySet()) {
        List<Message> receiveMessageList = messageBySendUserMap.get(sendUser);
        Integer noReadSize = receiveMessageList.stream().filter(o -> "0".equals(o.getIsRead()))
            .collect(Collectors.toList()).size();
        totol += noReadSize;
      }
      return totol;
    } catch (Exception e) {
      throw new Exception(e.getMessage());
    }
  }

// 发送信息的逻辑
  public void sendMessage(Connection conn, Message message) throws Exception {
    try {
      AssertUtils.isError(StringUtils.isEmpty(message.getSendUser()), "发送用户不能为空!");
      AssertUtils.isError(StringUtils.isEmpty(message.getReceiveUser()), "接收用户不能为空!");
      AssertUtils.isError(StringUtils.isEmpty(message.getContent()), "发送信息不能为空!");
      UserDao userDao = new UserDao(conn);
      MessageDao messageDao = new MessageDao(conn);
      User sendUser = userDao.selectbyUserId(message.getSendUser());
      AssertUtils.isError(null == sendUser, "发送用户不存在,发送信息失败!");
      AssertUtils.isError(sendUser.getStatus() != 1,
          "发送用户:" + message.getSendUser() + "状态已冻结,无法发送信息!");
      User receiveUser = userDao.selectbyUserId(message.getReceiveUser());
      AssertUtils.isError(null == receiveUser, "接收用户不存在,发送信息失败!");
      AssertUtils.isError(receiveUser.getStatus() != 1,
          "接收用户:" + message.getReceiveUser() + "状态已冻结,无法接收信息!");
      message.setHandle(UUID.randomUUID().toString());
      message.setIsRead("0");
      message.setCreateTime(LocalDateTime.now());
      messageDao.insert(message);
    } catch (Exception e) {
      throw new Exception(e.getMessage());
    }
  }
  // 获取两个人的聊天记录
  public List<Message> findMessageBySendUserAndReceiveUser(Connection conn, String sendUserId,
      String receiveUserId) throws Exception {
    try {
      AssertUtils.isError(StringUtils.isEmpty(sendUserId), "发送用户为空!");
      AssertUtils.isError(StringUtils.isEmpty(receiveUserId), "接收用户为空!");
      UserDao userDao = new UserDao(conn);
      User sendUser = userDao.selectbyUserId(sendUserId);
      AssertUtils.isError(null == sendUser, "发送用户不存在,发送信息失败!");
      User receiveUser = userDao.selectbyUserId(receiveUserId);
      AssertUtils.isError(null == receiveUser, "接收用户不存在,发送信息失败!");
      MessageDao messageDao = new MessageDao(conn);
      // 获取对方发送的信息
      List<Message> receiveMessageList = messageDao.selectBySendUserAndReceiveUserLimitLength(
          sendUserId,
          receiveUserId, limitMessagesLength);
      // 获取发送给对方的信息
      List<Message> sendMessageList = messageDao.selectBySendUserAndReceiveUserLimitLength(
          receiveUserId,
          sendUserId, limitMessagesLength);
      List<Message> allMessageList = new ArrayList<>();
      allMessageList.addAll(receiveMessageList);
      allMessageList.addAll(sendMessageList);
      List<Message> sortedMessageList = allMessageList.stream()
          .sorted(Comparator.comparing(Message::getCreateTime))
          .collect(Collectors.toList());

      // 设置已读
      List<Message> noReadMessageList = receiveMessageList.stream()
          .filter(o -> "0".equals(o.getIsRead()))
          .peek(message -> message.setIsRead("1"))
          .collect(Collectors.toList());
      if (noReadMessageList.size() > 0) {
        messageDao.update(noReadMessageList);
      }
      return sortedMessageList;
    } catch (Exception e) {
      throw new Exception(e.getMessage());
    }
  }

// 获取所有数据
  public List<MessageForm> findAllMessageForm(Connection conn, String userId) throws Exception {
    try {

      Map<String, WebSocket> users = webSocketUtil.getUsers();
      Set<String> ids = users.keySet();
      AssertUtils.isError(StringUtils.isEmpty(userId), "用户编号不能为空!");
      List<MessageForm> messageFormList = new ArrayList<>();
      UserDao userDao = new UserDao(conn);
      MessageDao messageDao = new MessageDao(conn);
      User loginUser = userDao.selectbyUserId(userId);
      AssertUtils.isError(null == loginUser, "用户编号:" + userId + "不存在!");
      messageFormList.addAll(findAllMessageChatDataWithLoginUserId(conn, userId));
      // 判断ids是否在messageFormList的sendUser的Id中,不是则获取新的数据到messageFormList
      for (String id : ids) {
        if (!messageFormList.stream().map(o -> o.getSendUser().getUserId())
            .collect(Collectors.toList())
            .contains(id)) {
          MessageForm messageForm = new MessageForm();
          User sendUserData = userDao.selectbyUserId(id);
          if (null == sendUserData) {
            continue;
          }
          List<Message> allMessageList = findBothMessages(userId, id,
              limitMessagesLength, messageDao);
          messageForm.setMessages(allMessageList);
          messageForm.setSendUser(sendUserData);
          messageForm.setReceiveUser(loginUser);
          messageForm.setIsOnline(true);
          messageForm.setNoReadMessageLength(0);
          messageForm.setLastMessage("");
          messageFormList.add(messageForm);
        }
      }
      // 获取所有messageFormList的sendUser的userId,
      List<String> sendUserIds = messageFormList.stream().map(MessageForm::getSendUser)
          .map(User::getUserId).collect(Collectors.toList());
      // 如果获取到的用户少于100个,则补齐剩余数量的用户数据,这里补齐的是从没和自己聊天过的用户
      if (sendUserIds.size() < limitUserLength) {
        Integer leaveCount = limitUserLength - sendUserIds.size();
        Where where = new Where();
        where.notIn("user_id", sendUserIds.toArray());
        where.limit(leaveCount);
        List<User> userList = userDao.selectByWhere(where);
        if (null != userList && userList.size() > 0) {
          for (User user : userList) {
            MessageForm messageForm = new MessageForm();
            messageForm.setSendUser(user);
            messageForm.setReceiveUser(loginUser);
            messageForm.setIsOnline(false);
            messageForm.setNoReadMessageLength(0);
            messageForm.setLastMessage("");
            messageFormList.add(messageForm);
          }
        }
      }
      // 按照在线状态为true,有聊天记录的优先展示
      messageFormList.sort((o1, o2) -> {
        if (o1.getIsOnline() && o2.getIsOnline()) {
          return o2.getMessages().size() - o1.getMessages().size();
        } else if (o1.getIsOnline()) {
          return -1;
        } else if (o2.getIsOnline()) {
          return 1;
        } else {
          return o2.getMessages().size() - o1.getMessages().size();
        }
      });
      return messageFormList;
    } catch (Exception e) {
      throw new Exception(e.getMessage());
    }
  }

// 获取登录用户所有聊过天的记录数据
  public List<MessageForm> findAllMessageChatDataWithLoginUserId(Connection conn, String userId)
      throws Exception {
    try {
      Map<String, WebSocket> users = webSocketUtil.getUsers();
      Set<String> ids = users.keySet();
      AssertUtils.isError(StringUtils.isEmpty(userId), "用户编号不能为空!");
      List<MessageForm> messageFormList = new ArrayList<>();
      UserDao userDao = new UserDao(conn);
      MessageDao messageDao = new MessageDao(conn);
      User loginUser = userDao.selectbyUserId(userId);
      AssertUtils.isError(null == loginUser, "用户编号:" + userId + "不存在!");
      // 获取所有有发送信息给自己聊天的用户
      List<String> allSendUsers = messageDao.selectByReceiveUser(userId).stream()
          .map(Message::getSendUser).distinct().collect(Collectors.toList());
      for (String sendUser : allSendUsers) {
        // 发送人的用户信息
        MessageForm messageForm = new MessageForm();
        User sendUserData = userDao.selectbyUserId(sendUser);
        if (null == sendUserData) {
          continue;
        }
        // 获取对方发送的信息
        List<Message> receiveMessageList = messageDao.selectBySendUserAndReceiveUserLimitLength(
            sendUser,
            userId, limitMessagesLength);
        // 获取发送给对方的信息
        List<Message> sendMessageList = messageDao.selectBySendUserAndReceiveUserLimitLength(
            userId,
            sendUser, limitMessagesLength);
        List<Message> allMessageList = new ArrayList<>();
        allMessageList.addAll(receiveMessageList);
        allMessageList.addAll(sendMessageList);

        List<Message> sortedMessageList = allMessageList.stream()
            .sorted(Comparator.comparing(Message::getCreateTime))
            .collect(Collectors.toList());
        // 赋值最新消息
        messageForm.setLastMessage(
            sortedMessageList.size() > 0 ? sortedMessageList.get(sortedMessageList.size() - 1)
                .getContent() : "");
        // 赋值聊天记录
        messageForm.setMessages(sortedMessageList);
        // 赋值未读消息
        messageForm.setNoReadMessageLength(
            receiveMessageList.stream().filter(o -> "0".equals(o.getIsRead())).collect(
                Collectors.toList()).size());
        // 赋值发送人
        messageForm.setSendUser(sendUserData);
        // 赋值接收人
        messageForm.setReceiveUser(loginUser);
        // 赋值是否在线
        messageForm.setIsOnline(ids.contains(sendUser));
        messageFormList.add(messageForm);
      }
      // 获取只有自己发送信息给别人的记录的用户
      List<String> allSendToUsers = messageDao.selectBySendUser(userId).stream()
          .map(Message::getReceiveUser).distinct().collect(Collectors.toList());
      for (String receiveUser : allSendToUsers) {
        // 判断messageFormList的sendUser的userId是否包含receiveUser,有则说明已经存在了
        if (messageFormList.stream()
            .anyMatch(o -> o.getSendUser().getUserId().equals(receiveUser))) {
          continue;
        }
        MessageForm messageForm = new MessageForm();
        User receiveUserData = userDao.selectbyUserId(receiveUser);
        if (null == receiveUserData) {
          continue;
        }
        messageForm.setReceiveUser(loginUser);
        messageForm.setSendUser(receiveUserData);
        messageForm.setLastMessage("");
        messageForm.setNoReadMessageLength(0);
        List<Message> sendMessageList = messageDao.selectBySendUserAndReceiveUserLimitLength(userId,
            receiveUser, limitMessagesLength);
        // 按照CreateTime排序从小到大
        messageForm.setMessages(sendMessageList.stream()
            .sorted(Comparator.comparing(Message::getCreateTime))
            .collect(Collectors.toList()));
        messageForm.setIsOnline(ids.contains(receiveUser));
        messageFormList.add(messageForm);
      }
      return messageFormList;
    } catch (Exception e) {
      throw new Exception(e.getMessage());
    }
  }

// 用户区查到的数据,有用户名就查用户名对应用户数据和聊天记录,没有默认查300个用户(配置)
  public List<MessageForm> searchUserForForm(Connection conn, String userId, String username)
      throws Exception {
    try {
      List<MessageForm> messageFormList = new ArrayList<>();
      AssertUtils.isError(StringUtils.isEmpty(userId), "登录用户不能为空!");
      UserDao userDao = new UserDao(conn);
      MessageDao messageDao = new MessageDao(conn);
      User loginUser = userDao.selectbyUserId(userId);
      AssertUtils.isError(null == loginUser, "登录用户不存在!");
      if (StringUtils.isNotEmpty(username)) {
        List<User> userList = userDao.selectByUserName(username);
        Map<String, WebSocket> users = webSocketUtil.getUsers();
        Set<String> ids = users.keySet();
        for (User user : userList) {
          User sendUserData = userDao.selectbyUserId(user.getUserId());
          if (null == sendUserData) {
            continue;
          }
          MessageForm messageForm = new MessageForm();
          messageForm.setSendUser(sendUserData);
          messageForm.setReceiveUser(loginUser);
          messageForm.setIsOnline(ids.contains(user.getUserId()));
          // 获取对方发送的信息
          List<Message> receiveMessageList = messageDao.selectBySendUserAndReceiveUserLimitLength(
              sendUserData.getUserId(), loginUser.getUserId(), limitMessagesLength);
          // 获取发送给对方的信息
          List<Message> sendMessageList = messageDao.selectBySendUserAndReceiveUserLimitLength(
              loginUser.getUserId(), sendUserData.getUserId(), limitMessagesLength);
          List<Message> allMessages = new ArrayList<>();
          allMessages.addAll(receiveMessageList);
          allMessages.addAll(sendMessageList);
          allMessages.sort((o1, o2) -> {
            if (o1.getCreateTime().isBefore(o2.getCreateTime())) {
              return -1;
            } else if (o1.getCreateTime().isAfter(o2.getCreateTime())) {
              return 1;
            } else {
              return 0;
            }
          });
          if (allMessages.size() > 0) {
            messageForm.setLastMessage(allMessages.get(allMessages.size() - 1).getContent());
          } else {
            messageForm.setLastMessage("");
          }
          messageForm.setMessages(allMessages);
          // 获取allMessages中isRead为0的数量
          messageForm.setNoReadMessageLength(
              (int) allMessages.stream().filter(message -> "0".equals(message.getIsRead()))
                  .count());
          messageFormList.add(messageForm);
        }
      } else {
        messageFormList.addAll(findAllMessageForm(conn, userId));
      }
      return messageFormList;
    } catch (Exception e) {
      throw new Exception(e.getMessage());
    }
  }
  
  private List<Message> findBothMessages(String sendUserId, String receiveUserId,
      Integer limitMessageLength,
      MessageDao messageDao) {
    List<Message> receiveMessageList = messageDao.selectBySendUserAndReceiveUserLimitLength(
        receiveUserId,
        sendUserId, limitMessageLength);
    List<Message> sendMessageList = messageDao.selectBySendUserAndReceiveUserLimitLength(
        sendUserId,
        receiveUserId, limitMessageLength);
    List<Message> allMessageList = new ArrayList<>();
    allMessageList.addAll(receiveMessageList);
    allMessageList.addAll(sendMessageList);
    allMessageList.stream().sorted(Comparator.comparing(Message::getCreateTime))
        .collect(Collectors.toList());
    return allMessageList;
  }
}

结语

以上为我用websocket实现聊天通讯的功能逻辑

Logo

前往低代码交流专区

更多推荐