SpringCloud微服务实现的服务端消息推送SSE[RabbitMQ消息队列]
SpringCloud微服务,实现服务端消息推送SSE,有两种方式:“定时长连接保持”和“Emitter事件监听处理”,都需要实现跨域操作。这里选用RabbitMQ监听接收消息,并以SSE方式及时推送到请求前端。1 前端EventSource请求发起if(typeof(EventSource) !== "undefined") {var evtSrc = new EventSource('http
SpringCloud微服务,实现服务端消息推送SSE,有两种方式:“定时长连接保持”和“Emitter事件监听处理”,都需要实现跨域操作。这里选用RabbitMQ监听接收消息,并以SSE方式及时推送到请求前端。
1 前端EventSource请求发起
if(typeof(EventSource) !== "undefined") {
var evtSrc = new EventSource('http://localhost:9005/sseMsgSvc");
evtSrc.onmessage = function (e) {
...
}
}
2 “定时长连接保持”实现
@Autowired
private Receiver rcvr;
@ResponseBody
@RequestMapping(value = "/sseMsgSvc", produces="text/event-stream;charset=UTF-8")
public void sseMsgSvc(HttpServletResponse response) throws Exception {
response.setContentType("text/event-stream");
response.setCharacterEncoding("UTF-8");
response.setHeader("Access-Control-Allow-Origin", "*");
response.setStatus(200);
String s = "";
while(!response.getWriter().checkError()){
String str = rcvr.getMdlMsg();
if((str==null)||(str.equals(""))||(str.equals(s))) ;
else {
response.getWriter().write("data:" + str + "--" + new Date()+"\n\n");
response.getWriter().flush();
System.out.println("Receiver : " + str);
s = str;
}
Thread.sleep(1000);
}
response.getWriter().close();
}
3 “Emitter事件监听处理”实现
@RequestMapping(value = "/sseMsgSvr", produces="text/event-stream;charset=UTF-8")
public SseEmitter sseMsgSvr() {
SseEmitter emitter = new SseEmitter(0L); // timeout=0, for ever
try {
MsgLsnr.addSseEmitters(emitter);
} catch (Exception e){
emitter.completeWithError(e);
}
return emitter;
}
相关的消息事件定义
public class MsgEvent extends ApplicationEvent {
private static final long serialVersionUID = 1L;
private String message;
public MsgEvent(Object source, String message) {
super(source);
this.message = message;
}
public String getMessage() { return message; }
public void setMessage(String message)
{ this.message = message; }
}
消息监听实现
@Component
public class MsgLsnr {
private static List<SseEmitter> sseEmitters = new ArrayList<>();
public static void addSseEmitters(SseEmitter sseEmitter) {
sseEmitters.add(sseEmitter);
}
@EventListener
public void deployEventHandler(MsgEvent msgEvent) throws IOException {
String message = msgEvent.getMessage();
SseEmitter sseEmitter = null;
int m = sseEmitters.size();
if(m>0) {
for(int i=0; i<m; i++) {
sseEmitter = sseEmitters.get(i);
sseEmitter.send(message);
}
//sseEmitter.complete();
}
}
}
测试:
4 相关的RabbitMQ消息监听与推送辅助
@Component
public class Receiver {
private String mdlMsg = "";
public String getMdlMsg() { return mdlMsg; }
@Autowired
ApplicationContext applicationContext;
@RabbitListener(queues = "rabbit-queue")
@RabbitHandler
public void process(String message) {
mdlMsg = message;
applicationContext.publishEvent(new MsgEvent(this, message));
String sid = wsSvc.getSid();
}
}
5 结束语
两种实现方式,原理基本一样,“定时长连接保持实现”较为简洁,“Emitter事件监听处理实现”更加灵活。
网关通过方面,spring-cloud-zuul不支持,它集成的是zuul1.x,只有zuul2.x才可以。spring-cloud已经不再升级核心的zuul到最新版本,更好用的spring-cloud-zuul替代品是spring-cloud-geteway。
更多推荐
所有评论(0)