SpringCloud微服务,实现服务端消息推送SSE,有两种方式:“定时长连接保持”和“Emitter事件监听处理”,都需要实现跨域操作。这里选用RabbitMQ监听接收消息,并以SSE方式及时推送到请求前端。

1 前端EventSource请求发起

      if(typeof(EventSource) !== "undefined") {
        var evtSrc = new EventSource('http://localhost:9005/sseMsgSvc");
        evtSrc.onmessage = function (e) {
          ...
        }
      }

2 “定时长连接保持”实现

    @Autowired
    private Receiver rcvr;
    @ResponseBody
    @RequestMapping(value = "/sseMsgSvc", produces="text/event-stream;charset=UTF-8")
    public void sseMsgSvc(HttpServletResponse response) throws Exception {
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("UTF-8");
        response.setHeader("Access-Control-Allow-Origin", "*");
        response.setStatus(200);
        String s = "";
        while(!response.getWriter().checkError()){
            String str = rcvr.getMdlMsg();
            if((str==null)||(str.equals(""))||(str.equals(s))) ;
            else {
                response.getWriter().write("data:" + str + "--" + new Date()+"\n\n");
                response.getWriter().flush();
                System.out.println("Receiver : " + str);
                s = str;
            }
            Thread.sleep(1000);
        }
        response.getWriter().close();
    }
3 “Emitter事件监听处理”实现

    @RequestMapping(value = "/sseMsgSvr", produces="text/event-stream;charset=UTF-8")
    public SseEmitter sseMsgSvr() {
        SseEmitter emitter = new SseEmitter(0L);    // timeout=0, for ever
        try {
           MsgLsnr.addSseEmitters(emitter);
        } catch (Exception e){
            emitter.completeWithError(e);
        }
        return emitter;
    }

相关的消息事件定义

public class MsgEvent extends ApplicationEvent {
    private static final long serialVersionUID = 1L;
    private String message;
 
    public MsgEvent(Object source, String message) {
        super(source);
        this.message = message;
    }

    public String getMessage() { return message; }
    public void setMessage(String message)
    {    this.message = message;    }
}

消息监听实现

@Component
public class MsgLsnr {
    private static List<SseEmitter> sseEmitters = new ArrayList<>();
 
    public static void addSseEmitters(SseEmitter sseEmitter) {
        sseEmitters.add(sseEmitter);
    }
 
    @EventListener
    public void deployEventHandler(MsgEvent msgEvent) throws IOException {
        String message = msgEvent.getMessage();
        SseEmitter sseEmitter = null;
        int m = sseEmitters.size();
        if(m>0) {
            for(int i=0; i<m; i++) {
                sseEmitter = sseEmitters.get(i);
                sseEmitter.send(message);
            }
            //sseEmitter.complete();
        }
    }
}

测试:

4 相关的RabbitMQ消息监听与推送辅助

@Component
public class Receiver {
    private String mdlMsg = "";
    public String getMdlMsg() { return mdlMsg; }
    @Autowired
    ApplicationContext applicationContext;
    @RabbitListener(queues = "rabbit-queue")
    @RabbitHandler
    public void process(String message) {
        mdlMsg = message;
        applicationContext.publishEvent(new MsgEvent(this, message));        
        String sid = wsSvc.getSid();
    }
}

5 结束语

两种实现方式,原理基本一样,“定时长连接保持实现”较为简洁,“Emitter事件监听处理实现”更加灵活。

网关通过方面,spring-cloud-zuul不支持,它集成的是zuul1.x,只有zuul2.x才可以。spring-cloud已经不再升级核心的zuul到最新版本,更好用的spring-cloud-zuul替代品是spring-cloud-geteway。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐