最近项目中需要接收告警提示 故采用了wobsocket来实现消息推送至前端

pom依赖

<!-- WebSocket -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    <version>2.1.0.RELEASE</version>
</dependency>

2.配置WebSocketConfig

package com.witsky.work.push;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3.创建WebSocketServer

package com.witsky.work.push.service;

import com.alibaba.fastjson.JSON;
import com.witsky.core.response.RestResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

@ServerEndpoint("/push/{sid}")
@Component
@Slf4j
public class WebSocketServer {

    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static int onlineCount = 0;
    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();

    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;

    //接收sid
    private String sid="";
    /**
     * 连接建立成功调用的方法*/
    @OnOpen
    public void onOpen(Session session,@PathParam("sid") String sid) {
        this.session = session;
        webSocketSet.add(this);     //加入set中
        addOnlineCount();           //在线数加1
        log.info("有新窗口开始监听:"+sid+",当前在线人数为" + getOnlineCount());
        this.sid=sid;
        /*try {
            sendMessage(JSON.toJSONString(RestResponse.success()));
        } catch (IOException e) {
            log.error("websocket IO异常");
        }*/
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);  //从set中删除
        subOnlineCount();           //在线数减1
        log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息*/
    @OnMessage
    public void onMessage(String message, Session session) {
        //log.info("收到来自窗口"+sid+"的信息:"+message);
        if("heart".equals(message)){
            try {
                sendMessage("heartOk");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        //群发消息
       /* for (WebSocketServer item : webSocketSet) {
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }*/
    }

    /**
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
        error.printStackTrace();
    }
    /**
     * 实现服务器主动推送
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }


    /**
     * 群发自定义消息
     * */
    public static void sendInfo(String message) throws IOException {

        for (WebSocketServer item : webSocketSet) {
            try {
                //这里可以设定只推送给这个sid的,为null则全部推送
//                if(sid==null) {

                    item.sendMessage(message);
                log.info("推送消息到窗口"+item.sid+",推送内容:"+message);
//                }else if(item.sid.equals(sid)){
//                    item.sendMessage(message);
//                }
            } catch (IOException e) {
                continue;
            }
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }

}

4.推送信息

package com.witsky.work.push.controller;

import com.witsky.work.push.service.WebSocketServer;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.io.IOException;

/**
 * @Author: Administrator
 * @Description:
 * @Date:Create:in 2019/9/27 16:51
 */
@RestController
@Slf4j
public class PushMsgConttroller {

    @Autowired
    WebSocketServer webSocketServer;
    @ApiOperation(value = "报警信息订阅",notes = "查询")
    @ResponseBody
    @RequestMapping(value = "/alarmEventPushWord",method = RequestMethod.POST)
    public void alarmEventPushWord(@RequestBody(required = true) String requestBody) {
        try {
            webSocketServer.sendInfo(requestBody);//接收需要推送的消息 推送给前端
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

5.前端vue 代码(找前端小姐姐要的代码)  前端链接的地址 开头是需要用  http对应 ws://;

ws://ip:端口/sjqbakend/push/{sid}

<template>
  <div class="app-wrapper">
    <nav-bar></nav-bar>
    <sidebar class="sidebar-container" v-if="isVisible"></sidebar>
    <app-main></app-main>
  </div>
</template>
<script>
import { mapGetters } from 'vuex'
import {NavBar, AppMain, Sidebar} from './components'
export default {
  name: 'Layout',
  data () {
    return {
      isVisible: false,
      websocket: null,
      lockReconnect: false, //避免ws重复连接
      timeout: 60*1000, // 1分钟一次心跳
      timeoutObj: null, // 心跳定时器
      serverTimeoutObj: null, // 心跳定时器
      timeoutnum: null, // 断开 重连的定时器
      notes: []
    }
  },
  components: {
    NavBar,
    AppMain,
    Sidebar
  },
  computed: {
    ...mapGetters([
      'name'
    ]),
  },
  watch: {
    '$route' (to, from) {
      if (to.meta && to.meta.sliderSys) {
        this.isVisible = true
      }else {
        this.isVisible = false
      }
    }
  },
  created() {
    if (this.$route.meta.sliderSys) {
      this.isVisible = true
    }else {
      this.isVisible = false
    }
  },
  mounted() {
    if ('WebSocket' in window) {
      this.initWebSocket()
    } else {
      alert('当前浏览器不支持websocket,无法接收告警推送! ')
    }
  },
  beforeDestroy() {
    if (this.websocket != null) {
      clearTimeout(this.timeoutObj)
      clearTimeout(this.serverTimeoutObj)
      clearTimeout(this.timeoutnum)
      this.timeoutObj = null
      this.serverTimeoutObj = null
      this.timeoutnum = null
      this.onbeforeunload()
    }
  },
  methods: {
    initWebSocket() {
      
      //console.log(url);//'ws://ip:端口/sjqbakend/push/'+ this.name 
      const wsUrl = process.env.WEBSOCKET_URL +'/push/'+ this.name
      if (this.websocket == null) {
        this.websocket = new WebSocket(wsUrl)
      }

      //连接错误
      this.websocket.onerror = this.setErrorMessage
 
      // //连接成功
      this.websocket.onopen = this.setOnopenMessage
 
      //收到消息的回调
      this.websocket.onmessage = this.setOnmessageMessage
 
      //连接关闭的回调
      this.websocket.onclose = this.setOncloseMessage
 
      //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
      window.onbeforeunload = this.onbeforeunload
    },
    reconnect () { //重新连接
      var that = this;
      if(that.lockReconnect) {
          return;
      };
      that.lockReconnect = true;
      //没连接上会一直重连,设置延迟避免请求过多
      console.log('断开重新连接...');
      that.timeoutnum && clearTimeout(that.timeoutnum);
      that.timeoutnum = setTimeout(function () {
          //新连接
          that.initWebSocket();
          that.lockReconnect = false;
      },10000);
    },
    reset () { // 重置心跳
      var that = this;
      //清除时间
      clearTimeout(that.timeoutObj);
      clearTimeout(that.serverTimeoutObj);
      //重启心跳
      that.start();
    },
    start(){//开启心跳
      var that = this;
      that.timeoutObj && clearTimeout(that.timeoutObj);
      that.serverTimeoutObj && clearTimeout(that.serverTimeoutObj);
      that.timeoutObj = setTimeout(function(){
          //这里发送一个心跳,后端收到后,返回一个心跳消息,
          if (that.websocket.readyState == 1) {//如果连接正常
              that.websocket.send("heart");
          }else{//否则重连
              that.reconnect();
          }
          that.serverTimeoutObj = setTimeout(function() {
              //超时关闭
              that.websocket.close();
          }, that.timeout);

      }, that.timeout)
    },
    setErrorMessage() {
      console.log("WebSocket连接发生错误" + '   状态码:' + this.websocket.readyState)
      this.reconnect() //重连websocket
    },
    setOnopenMessage() {
      console.log("WebSocket连接成功" + '   状态码:' + this.websocket.readyState)
      this.start() //开启心跳
    },
    setOnmessageMessage(event) {
      //const redata = JSON.parse(event.data);
      this.reset() // 收到服务器消息,心跳重置
      const res =  event.data
      if (res !== 'heartOk') {
        console.log('服务端返回:' + res)
        let result = JSON.parse(res)
        for (let i = 0; i < result.length; i++) {
          let _strHtml = `<div class="warn-body">
                            <div style="color:#78cdff;font-size:16px;font-weight:bold;margin-bottom:10px;"><span style="color:#fff;font-weight:normal;">告警时间:</span>${result[i].time}</div>
                            <div style="color:#78cdff;font-size:16px;font-weight:bold;margin-bottom:10px;"><span style="color:#fff;font-weight:normal;">告警位置:</span>${result[i].detectorName?result[i].detectorName:result[i].mainframeName}</div>
                            <div style="color:#78cdff;font-size:16px;font-weight:bold;margin-bottom:10px;"><span style="color:#fff;font-weight:normal;">告警类型:</span>${result[i].alarmTypeName?result[i].alarmTypeName:''}</div>
                            <div style="color:#78cdff;font-size:16px;font-weight:bold;margin-bottom:10px;"><span style="color:#fff;font-weight:normal;">告警数值:</span>${result[i].analogvalue}</div>
                            <div style="color:#78cdff;font-size:16px;font-weight:bold;"><span style="color:#fff;font-weight:normal;">告警内容:</span>${result[i].alarmContent}</div>
                          </div>`
          let noti = this.$notify({
            title: '告警提醒',
            dangerouslyUseHTMLString: true,
            message:  _strHtml,
            position: 'bottom-right',
            duration: 0,
          })
          this.notes.push(noti)
        }
      }
    },
    setOncloseMessage() {
      console.log("WebSocket连接关闭" + '   状态码:' + this.websocket.readyState)
      // console.log(this.$route);
      if (this.$route.meta.noWebsocket) { // 登录页面不需要重连
        clearTimeout(this.timeoutObj)
        clearTimeout(this.serverTimeoutObj)
        clearTimeout(this.timeoutnum)
        this.timeoutObj = null
        this.serverTimeoutObj = null
        this.timeoutnum = null
        this.notes.forEach((item)=>{
          item.close()
        })
      }else {
        this.reconnect() //重连websocket
      }
    },
    onbeforeunload() {
      this.closeWebSocket()
    },
    closeWebSocket() {
      this.websocket.close()
    }
  }
}
</script>
<style scope>
  .app-wrapper {
    width: 100%;
    height: 100%;
  }
  .el-notification {
    width: 415px;
    background-color: rgba(18, 47, 86, 0.95);
    border: none;
    border-radius: 0;
    padding: 0;
  }
  .el-notification.right {
    bottom: 14px !important;
  }
  .el-notification__group{
    margin: 0;
  }
  .el-notification__title {
    color: #ffbb18;
    font-size: 18px;
    margin: 0;
    margin: 20px auto 20px 20px;
  }
  .el-notification__content {
    width: 100%;
    border-top: 1px solid #2b5082;
    margin: 0;
    padding: 28px 30px;

  }
  .el-notification__closeBtn {
    color: #78cdff;
  }
  .el-notification__closeBtn:hover {
    color: #54bbf7;
  }
</style>

我们用的前后端分离 这个推送需要配置nginx  连接过一段时间就会断开  解决方法就是添加心跳  案例中给出的是有添加心跳来控制的  还有在nginx中配置

这个 proxy_read_timeout 默认60s  就是连接60s内没有会话信息那么websock连接就会关闭,心跳设置的时间要小于这个会话过期的时间才可

Logo

前往低代码交流专区

更多推荐