Websocket学习(@EnableWebSocket&@ServerEndpoint&websocket认证&nginx配置WebSocket&gateway配置websocket)
WebSocket协议:5分钟从入门到精通https://www.cnblogs.com/chyingp/p/websocket-deep-in.htmlWebSocket协议入门介绍h ttps://www.cnblogs.com/nuccch/p/10947256.html
学习链接
WebSocket协议:5分钟从入门到精通https://www.cnblogs.com/chyingp/p/websocket-deep-in.html
WebSocket协议入门介绍 https://www.cnblogs.com/nuccch/p/10947256.html
java的websocket客户端和服务端库 https://github.com/TooTallNate/Java-WebSocket
对于Tomcat中的websocket的了解 https://blog.csdn.net/zjgyjd/article/details/99686939
基于Tomcat的webSocket的使用方式和源码分析 https://blog.csdn.net/coder_what/article/details/109032940
websocket之三:Tomcat的WebSocket实现 https://www.cnblogs.com/duanxz/p/5041110.html
WebSocket通信原理和在Tomcat中实现源码详解(万字爆肝) https://stefan.blog.csdn.net/article/details/120025498
Springboot使用websocket服务端+客户端(断线重连)https://blog.csdn.net/m0_43449433/article/details/124404374
Nginx配置WebSocket https://blog.csdn.net/qq_43907505/article/details/127694761
【Nginx笔记02】通过Nginx服务器转发客户端的WebSocket接口到后端服务
Gateway集成WebSocket 实现前后端通信(全) https://blog.csdn.net/YonJarLuo/article/details/117957845
springboot整合webSocket:鉴权,心跳检测,wss请求,nginx配置、集群部署
文章目录
spring整合websocket逻辑详细图解
协议过程简介
1、客户端:申请协议升级
首先,客户端发起协议升级请求。可以看到,采用的是标准的HTTP报文格式,且只支持GET方法。
GET / HTTP/1.1
Host: localhost:8080
Origin: http://127.0.0.1:3000
Connection: Upgrade
Upgrade: websocket
Sec-WebSocket-Version: 13
Sec-WebSocket-Key: w4v7O6xFTi36lq3RNcgctw==
重点请求首部意义如下:
Connection: Upgrade:表示要升级协议
Upgrade: websocket:表示要升级到websocket协议。
Sec-WebSocket-Version: 13:表示websocket的版本。如果服务端不支持该版本,需要返回一个Sec-WebSocket-Versionheader,里面包含服务端支持的版本号。
Sec-WebSocket-Key:与后面服务端响应首部的Sec-WebSocket-Accept是配套的,提供基本的防护,比如恶意的连接,或者无意的连接。
注意,上面请求省略了部分非重点请求首部。由于是标准的HTTP请求,类似Host、Origin、Cookie等请求首部会照常发送。在握手阶段,可以通过相关请求首部进行 安全限制、权限校验等。
2、服务端:响应协议升级
服务端返回内容如下,状态代码101表示协议切换。到此完成协议升级,后续的数据交互都按照新的协议来。
HTTP/1.1 101 Switching Protocols
Connection:Upgrade
Upgrade: websocket
Sec-WebSocket-Accept: Oy4NRAQ13jhfONC7bP8dTKb4PTU=
备注:每个header都以\r\n结尾,并且最后一行加上一个额外的空行\r\n。此外,服务端回应的HTTP状态码只能在握手阶段使用。过了握手阶段后,就只能采用特定的错误码。
3、数据帧交互
客户端、服务端数据的交换,离不开数据帧格式的定义。因此,在实际讲解数据交换之前,我们先来看下WebSocket的数据帧格式。
WebSocket客户端、服务端通信的最小单位是帧(frame),由1个或多个帧组成一条完整的消息(message)。
发送端:将消息切割成多个帧,并发送给服务端;
接收端:接收消息帧,并将关联的帧重新组装成完整的消息;
4、连接保持+心跳
WebSocket为了保持客户端、服务端的实时双向通信,需要确保客户端、服务端之间的TCP通道保持连接没有断开。然而,对于长时间没有数据往来的连接,如果依旧长时间保持着,可能会浪费包括的连接资源。
但不排除有些场景,客户端、服务端虽然长时间没有数据往来,但仍需要保持连接。这个时候,可以采用心跳来实现。
发送方->接收方:ping
接收方->发送方:pong
ping、pong的操作,对应的是WebSocket的两个控制帧,opcode分别是0x9、0xA。
举例,WebSocket服务端向客户端发送ping,只需要如下代码(采用ws模块)
ws.ping('', false, true);
最简单的demo
引入Java-WebSocket库
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zzhua</groupId>
<artifactId>demo-base-websocket</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
</project>
Java ws服务端
package com.zzhua;
import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.handshake.ServerHandshake;
import org.java_websocket.server.WebSocketServer;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.locks.LockSupport;
public class Test001 {
@Test
public void test01() {
WebSocketServer webSocketServer = new WebSocketServer(new InetSocketAddress(8080)) {
public void onOpen(WebSocket conn, ClientHandshake handshake) {
System.out.println("onOpen");
}
public void onClose(WebSocket conn, int code, String reason, boolean remote) {
System.out.println("onClose");
}
public void onMessage(WebSocket conn, String message) {
System.out.println("onMessage");
}
public void onError(WebSocket conn, Exception ex) {
System.out.println("onError");
}
public void onStart() {
System.out.println("onStart");
}
};
webSocketServer.run();
}
}
Java ws客户端
package com.zzhua;
import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.handshake.ServerHandshake;
import org.java_websocket.server.WebSocketServer;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.locks.LockSupport;
public class Test001 {
@Test
public void test02() throws URISyntaxException, InterruptedException {
WebSocketClient webSocketClient = new WebSocketClient(new URI("ws://localhost:8080")) {
public void onOpen(ServerHandshake handshakedata) {
System.out.println("client onOpen");
}
public void onMessage(String message) {
System.out.println("client onMessage");
}
public void onClose(int code, String reason, boolean remote) {
System.out.println("client onClose");
}
public void onError(Exception ex) {
System.out.println("client onError");
}
};
webSocketClient.connect();
Thread.sleep(5000);
LockSupport.park();
}
}
tomcat下使用websocket
引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zzhua</groupId>
<artifactId>demo-tomcat-websocket</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>war</packaging>
<properties>
<jsp.version>2.1</jsp.version>
<servlet.version>3.1.0</servlet.version>
</properties>
<dependencies>
<dependency>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
<version>${jsp.version}</version>
<scope>provided</scope>
</dependency>
<!-- javax.servlet-api -->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>${servlet.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/javax.websocket/javax.websocket-api -->
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<version>1.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<finalName>0520carrent</finalName>
<plugins>
<!-- 配置tomcat的运行插件 -->
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<version>2.2</version>
<configuration>
<!-- 配置端口 -->
<port>8080</port>
<!-- 配置urlencoding -->
<uriEncoding>UTF-8</uriEncoding>
<path>/</path>
</configuration>
</plugin>
<!-- 配置jdk的编译版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<!-- 指定source和target的版本 -->
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
web.xml
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
version="3.1">
</web-app>
MyServlet
@WebServlet(urlPatterns = "/my")
public class MyServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
for (MyWebSocketEndpoint myWebSocketEndpoint : MyWebSocketEndpoint.getConnections()) {
myWebSocketEndpoint.getSession().getBasicRemote().sendText("halo");
}
resp.getWriter().write("hello world");
}
}
MyWebSocketEndpoint
package com.zzhua.ws;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
@ServerEndpoint(value="/websocket")
public class MyWebSocketEndpoint {
private static final AtomicInteger counter = new AtomicInteger(0); // 客户端计数器
private static final Set<MyWebSocketEndpoint> connections = new CopyOnWriteArraySet<MyWebSocketEndpoint>(); // 客户端websocket连接集合
private Session session = null; // WebSocket会话对象
private Integer number = 0; // 客户端编号
public MyWebSocketEndpoint() {
number = counter.incrementAndGet();
}
/**
* 客户端建立websocket连接
* @param session
*/
@OnOpen
public void start(Session session) {
System.out.println("on open");
this.session = session;
connections.add(this);
try {
session.getBasicRemote().sendText(new StringBuffer().append("Hello: ").append(number).toString());
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 客户端断开websocket连接
*/
@OnClose
public void close() {
System.out.println("session close");
try {
this.session.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
connections.remove(this);
}
}
/**
* 接收客户端发送的消息
* @param message
*/
@OnMessage
public void message(String message) {
System.out.println("message: "+ message);
for(MyWebSocketEndpoint client : connections) {
synchronized (client) {
try {
client.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@OnError
public void error(Throwable t) {
System.out.println("client: error"+ number+t.getMessage());
}
public static Set<MyWebSocketEndpoint> getConnections() {
return connections;
}
public Session getSession() {
return session;
}
}
html的websocket客户端
客户端主要事件和方法
new WebSocket
新建websocket连接onopen
监听t连接打开事件onmessage
有消息时触发onclose
连接关闭t时触发onerror
连接出错时触发send
发送消息close
关闭连接
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
请输入: <input id="content" type="text"/>
<button onclick="sendMsg()">发送</button>
<button onclick="closeWebsocket()">关闭</button>
<ul id="msgList">
</ul>
</body>
<script>
/* 参考: https://developer.mozilla.org/zh-CN/docs/Web/API/WebSocket */
// var socket = new WebSocket('ws://localhost:8080/websocket01');
var socket = new WebSocket('ws://localhost:8080/websocket/user001');
// var socket = new WebSocket('ws://[[${ip}]]:8080/websocket01');
// 指定连接成功后的回调
socket.onopen = function (event) {
console.log("建立连接成功")
}
// 发送消息给服务器
function sendMsg() {
var content = document.querySelector('#content');
socket.send(content.value) // 使用websocket发送消息到服务器
}
// 收到服务器的消息时的回调
socket.onmessage = function (ev) {
console.log("收到服务器消息: " + JSON.stringify(ev))
var ul = document.querySelector('#msgList');
var li = document.createElement('li');
li.innerText = ev.data
ul.appendChild(li)
}
// 手动关闭websocket
function closeWebsocket() {
socket.close()
}
// 指定连接关闭后的回调
socket.onclose = function (event) {
console.log("连接关闭")
}
</script>
</html>
Springboot整合websocket-1(@ServerEndpoint)
引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.zzhua</groupId>
<artifactId>demo-springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo-springboot</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
WsConfig
@Configuration
public class WsConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
MyWebSocketEndpoint
package com.zzhua.ws;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
/*
1. @ServerEndpoint用于标记被修饰的类是一个 websocket服务器 的一个端点,且被标注的类必须有一个public的无参构造方法
2. @ServerEndpoint可以指定 端点发布的路径, 和其它重要的属性,如encoders编码器、decoders解码器
3. 端点发布的路径可以带路径变量,并且该路径变量名可以在@OnOpen和@OnClose标注的方法的方法参数中使用
如:发布路径是@ServerEndpoint(value="/websocket/{userId}"),则在@OnOpen修饰的方法中可以使用@PathParam("userId"),
4. @ServerEndpoint允许指定自定义配置器 ServerEndpointConfig.Configurator
*/
@Component
@ServerEndpoint(value="/websocket/{userId}/{username}")
public class MyWebSocketEndpoint {
private static final AtomicInteger counter = new AtomicInteger(0); // 客户端计数器
private static final Set<MyWebSocketEndpoint> connections = new CopyOnWriteArraySet<MyWebSocketEndpoint>(); // 客户端websocket连接集合
private Session session = null; // WebSocket会话对象
private Integer number = 0; // 客户端编号
public MyWebSocketEndpoint() {
number = counter.incrementAndGet();
}
/**
* 客户端建立websocket连接
* @param session
*/
@OnOpen
public void start(Session session, @PathParam("userId") String userId, @PathParam("username") String username) {
System.out.println("on open");
this.session = session;
connections.add(this);
try {
session.getBasicRemote().sendText(new StringBuffer().append("Hello: ").append(number).toString());
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 客户端断开websocket连接
*/
@OnClose
public void close(@PathParam("userId") String userId) {
System.out.println("session close");
try {
this.session.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
connections.remove(this);
}
}
/**
* 接收客户端发送的消息
* @param message
*/
@OnMessage
public void message(String message,@PathParam("userId") String userId,Session session) {
System.out.println("message: "+ message);
for(MyWebSocketEndpoint client : connections) {
synchronized (client) {
try {
client.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@OnError
public void error(Throwable t) {
System.out.println("client: error"+ number+t.getMessage());
}
public static Set<MyWebSocketEndpoint> getConnections() {
return connections;
}
public Session getSession() {
return session;
}
}
html的websocket客户端
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
请输入: <input id="content" type="text"/>
<button onclick="sendMsg()">发送</button>
<button onclick="closeWebsocket()">关闭</button>
<ul id="msgList">
</ul>
</body>
<script>
/* 参考: https://developer.mozilla.org/zh-CN/docs/Web/API/WebSocket */
// var socket = new WebSocket('ws://localhost:8080/websocket01');
var socket = new WebSocket('ws://localhost:8080/websocket/user001/username001');
// var socket = new WebSocket('ws://[[${ip}]]:8080/websocket01');
// 指定连接成功后的回调
socket.onopen = function (event) {
console.log("建立连接成功")
}
// 发送消息给服务器
function sendMsg() {
var content = document.querySelector('#content');
socket.send(content.value) // 使用websocket发送消息到服务器
}
// 收到服务器的消息时的回调
socket.onmessage = function (ev) {
console.log("收到服务器消息: " + JSON.stringify(ev))
var ul = document.querySelector('#msgList');
var li = document.createElement('li');
li.innerText = ev.data
ul.appendChild(li)
}
// 手动关闭websocket
function closeWebsocket() {
socket.close()
}
// 指定连接关闭后的回调
socket.onclose = function (event) {
console.log("连接关闭")
}
</script>
</html>
这样建立websocket连接是比较容易,但是不能所有客户端都能连接服务端吧,比如只有登录了,才能成功建立websocket连接,这样才比较合理。如果采用这种整合方式,就必须知道它建立ws连接的过程。它是依靠WsFilter过滤器拦截所有请求,判断是不是切换ws协议,如果是,则升级协议,完成握手。所以,
1、要么我们重写WsFilter的doFilter,先进行登录状态验证,再调用父类。
2、要么在WsFilter前,再插入一个过滤器,验证通过了,再交给WsFilter处理,这样WsFilter它就不用操心认证的事了。
3、要么配置一个自定义的ServerEndpointConfig.Configurator,重写里面的modifyHandshake方法,这里面是可以拿到请求相关的信息的。但是不方便的是,这个自定义配置器对象,是通过反射创建的,并不是由spring管理的,虽然我们可以通过拿到各种手段拿到spring容器的方式实现解耦,这是能想到的最简单的方式了。
Springboot整合websocket-2(@EnableWebsocket)
看这个具体的实现之前,肯定是需要先看懂上面的实现的具体过程的,也就是说,上面相当于是使用原生的tomcat,只不过springboo帮我们注册端点类。而这里,spring抽象出了处理websocket的接口,屏蔽了各种web容器间的实现差异,里面也用到了很多的装饰者模式,不得不佩服spring的抽象能力哇。难怪之前我看的懵逼,会想:为什么有的时候上面这样写,有的时候下面那样写?到底谁优谁劣?它们实现一不一样?这2种写法能不能一起使用?谁优谁劣,一眼就可以看出来了,当然是下面这种更好啦,它又回归到我们熟悉的spring容器咯~
而且这里面实现也是参考了@EnableWebMvc一样的配置原理,并且处理过程也借鉴了springmvc的处理流程,它的握手流程也是通过springmvc的HttpRequestHandler类型的处理器(WebSocketHttpRequestHandler)介入交给握手处理器AbstractHandshakeHandler,握手处理器里面通过协议升级策略方式来屏蔽不同web容器的差异。
源码初探
在查看源码的时候,上面这种方式和下面这种方式,握手后的协议升级最终都会到UpgradeUtil#doUpgrade这个方法,这个比较关键,在这个doUpgrade方法里,会调用HttpServletRequest#upgrade(WsHttpUpgradeHandler.class)创建一个WsHttpUpgradeHandler对象(在协议升级后,接收到消息时,也是用这个对象处理的),
源码还可以留意到:AbstractProtocol$ConnectionHandler(静态内部类)#process方法和connections属性。在process里面,由“不变的processor”处理请求,如果是协议升级请求,则处理后会返回UPGRADING状态,这其中就包含了协议升级过程,processor中可以得到UpgradeToken,然后创建出一个新的processor放入connections中,再从UpgradeTken中获取httpUpgradeHandler初始化这个新的processor。由于connections中的key就是socket,当socket中有数据来了,就可以找到对应的processor,交给此processor处理了。并且 我们还注意这里面是有个while循环的,跳出的条件是返回的状态不为SocketState.UPGRADING,所以此时新的processor会立马执行一遍processor.process(wrapper, status),相当于前面这一步是在握手,握手之后的这一步是在通知连接建立成功,debug的时候,注意这一点就好了
引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.zzhua</groupId>
<artifactId>demo-springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo-springboot</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
WebsocketConfig
@Configuration
@EnableWebSocket
public class WebsocketConfig implements WebSocketConfigurer {
public static Map<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>();
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry
.addHandler(new TextWebSocketHandler() { // WebSocketHandler接口
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
System.out.println("建立websocket连接后的回调," + session + "," + this + RequestContextHolder.getRequestAttributes());
// 存入sessionMap,实际应该关联到用户
String name = (String) session.getAttributes().get("name");
System.out.println("与: {}建立链接成功" + name);
sessionMap.put(name, session);
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
System.out.println("收到websocket消息," + session + "," + message+ RequestContextHolder.getRequestAttributes());
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
System.out.println("websocket连接关闭后回调" + "," + status + RequestContextHolder.getRequestAttributes());
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
System.out.println("连接发生错误被回调");
}
}, "/websocket01")
.addInterceptors(new HandshakeInterceptor() { // HandshakeInterceptor接口
@Override // 这里回传的ServerHttpRequest(实现类为ServletServerHttpRequest)包裹了HttpServletRequest
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
System.out.println("握手前的回调," + wsHandler + ","+ attributes + "," + RequestContextHolder.getRequestAttributes());
// 也可以从session中获取用户信息(也可以使用请求头放token,只要能获取到用户都ok)
String name = ((ServletServerHttpRequest) request).getServletRequest().getParameter("name");
System.out.println("握手拦截器,获取name: {}"+name);
synchronized (this) {
if (!sessionMap.containsKey(name)) {
attributes.put("name", name);
System.out.println("sessionMap中不包含此name:{},允许建立链接"+name);
return true;
}
System.out.println("sessionMap中已包含此name:{}"+name);
return false;
}
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
System.out.println("握手后回调, " + "," + wsHandler + "," + exception + RequestContextHolder.getRequestAttributes());
}
})
.setHandshakeHandler(new MyHandShakeHandler()) // HandshakeHandler
.setAllowedOrigins("*")
// 开启sockJs,记得把websocketEnabled也给开启了,否则客户端只能使用sockJs,而不能使用websocket
// .withSockJS()
// .setWebSocketEnabled(true)
;
}
static class MyHandShakeHandler implements HandshakeHandler {
// 借助Spring提供的DefaultHandshakeHandler完成握手(仅仅包裹了一下spring封装的DefaultShakeHandler)
private DefaultHandshakeHandler defaultHandshakeHandler = new DefaultHandshakeHandler();
@Override
public boolean doHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws HandshakeFailureException {
System.out.println("开始握手..." + RequestContextHolder.getRequestAttributes());
boolean flag = defaultHandshakeHandler.doHandshake(request, response, wsHandler, attributes);
System.out.println("握手完成..." + flag);
return flag;
}
}
}
html的websocket客户端
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
请输入: <input id="content" type="text"/>
<button onclick="sendMsg()">发送</button>
<button onclick="closeWebsocket()">关闭</button>
<ul id="msgList">
</ul>
</body>
<script>
/* 参考: https://developer.mozilla.org/zh-CN/docs/Web/API/WebSocket */
// var socket = new WebSocket('ws://localhost:8080/websocket01');
// var socket = new WebSocket('ws://localhost:8080/websocket/user001/username001');
var socket = new WebSocket('ws://localhost:8080/websocket01?name=zzhua');
// var socket = new WebSocket('ws://[[${ip}]]:8080/websocket01');
// 指定连接成功后的回调
socket.onopen = function (event) {
console.log("建立连接成功")
}
// 发送消息给服务器
function sendMsg() {
var content = document.querySelector('#content');
socket.send(content.value) // 使用websocket发送消息到服务器
}
// 收到服务器的消息时的回调
socket.onmessage = function (ev) {
console.log("收到服务器消息: " + JSON.stringify(ev))
var ul = document.querySelector('#msgList');
var li = document.createElement('li');
li.innerText = ev.data
ul.appendChild(li)
}
// 手动关闭websocket
function closeWebsocket() {
socket.close()
}
// 指定连接关闭后的回调
socket.onclose = function (event) {
console.log("连接关闭")
}
</script>
</html>
实际应用时的注意点
这一块已经梳理好了,可以直接参考这篇:Spring整合tomcat的WebSocket详细逻辑(图解)
携带id或token
websocket不可能让所有的客户端都能随随便便就能连上,需要携带token或者id
- 直接带在url的query上
- 带在请求头上(利用WebSocket的构造参数的第二个参数-子协议(视为token)-可参考:学习风`宇blog的websocket模块)
- cookie
心跳检测
我们需要定期向websocket发送消息,看有没有回信(每隔半分钟,向服务器发送一个消息)
定时重连
如果链接出错或者中断,我们需要重连,并且也不能无限制的重连(达到最大重连次数后,给出错误提示)
nginx配置WebSocket
server{
# 监听的端口号
listen 9095;
server_name robotchat.lukeewin.top; # 这里填写的是访问的域名
location / {
proxy_pass http://127.0.0.1:9090; # 这里填写的是代理的路径和端口
proxy_set_header Host $host;
proxy_set_header X-Real_IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
# 以下配置针对websocket
location /ws { # onlineCount为websocket的访问uri
proxy_redirect off;
proxy_pass http://127.0.0.1:9090;
proxy_set_header Host $host;
proxy_set_header X-Real_IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_http_version 1.1;
proxy_read_timeout 36000s;
proxy_send_timeout 36000s;
proxy_set_header Upgrade $http_upgrade; # 升级协议头 websocket
proxy_set_header Connection "upgrade";
}
}
如果没写如下语句,则会报EOFException
proxy_read_timeout 36000s;
proxy_send_timeout 36000s;
添加如下三行语句,才能在后台中拿到真实的ip地址。
proxy_set_header Host $host;
proxy_set_header X-Real_IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
获取真实ip地址代码如下。
public String getRealIp(HttpServletRequest request) {
String ip = request.getHeader("X-Forwarded-For");
if (StringUtils.isNotEmpty(ip) && !"unKnown".equalsIgnoreCase(ip)) {
int index = ip.indexOf(",");
if (index != -1) {
return ip.substring(0, index);
} else {
return ip;
}
}
ip = request.getHeader("X-Real-IP");
if (StringUtils.isNotEmpty(ip) && !"unKnown".equalsIgnoreCase(ip)) {
return ip;
}
return request.getRemoteAddr();
}
spring-websocket+Java客户端的实现
1.业务场景描述
这个案例主要是用来监控用的,就是把接口中的数据实时的展示到页面上,比如一个下订单的接口,在外界访问订单接口时,订单存入到数据库后需要将订单数据展示到页面上。因此就可以在接口中使用Java客户端将订单信息发送到websocket服务器,然后页面作为客户端把接口订单数据暂时到页面上。在接口中将Java客户端的连接传入接口中,直接通过连接发送订单信息到服务器。连接websocket的客户端代码通过静态代码块的方式加载,然后将建立连接后的websocket连接存到静态变量,在接口中通过静态变量,就可以获取websocket连接,进而达到在接口中发送订单信息的目的。
2.所需jar包
服务端:spring-websocket,spring-messaging
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>${spring.version}</version>
</dependency>
客户端:javax.websocket-api,tyrus-standalone-client
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>org.glassfish.tyrus.bundles</groupId>
<artifactId>tyrus-standalone-client</artifactId>
<version>1.9</version>
</dependency>
3.websocket配置文件
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
/**
* websocket配置文件
* 有了该配置文件,就不用在spring-mvc.xml中进行websocket的配置
* EnableWebSocket注解 :开启websocket服务
* @author cm
*/
@Configuration
@EnableWebMvc
@EnableWebSocket
public class WebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer {
/**
*添加处理器和拦截器,处理器后面的地址就是websocket的访问路径
* setAllowedOrigins:指定的域名或IP,如果不限制使用"*"就可以了
* @param registry
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myWebSocketHandler(), "/websocket/demo")
.addInterceptors(myInterceptor()).setAllowedOrigins("*");
}
/**
* 直接注入自己定义的websocket处理器
* @return
*/
@Bean
public WebSocketHandler myWebSocketHandler(){
return new MyWebSocketHandler();
}
/**
* 直接注入自己定义的websocket拦截器
* @return
*/
@Bean
public WebSocketHandshakeInterceptor myInterceptor(){
return new WebSocketHandshakeInterceptor();
}
}
4.websocket拦截器
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
/**
* websocket拦截器:一般情况下不做处理
*/
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {
@Override
public void afterHandshake(ServerHttpRequest arg0, ServerHttpResponse arg1, WebSocketHandler arg2, Exception arg3) {
}
@Override
public boolean beforeHandshake(ServerHttpRequest arg0, ServerHttpResponse arg1, WebSocketHandler arg2,
Map<String, Object> arg3) throws Exception {
return true;
}
}
5.websocket处理器
import org.springframework.scheduling.annotation.Async;
import org.springframework.web.socket.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
/**
* websocket处理器:功能实现的核心代码编写类
* @author cm
*/
public class MyWebSocketHandler implements WebSocketHandler{
/**
* 定义一个全局的初始化值count=0,记录连接数
*/
private static int count = 0;
/**
* 记录所有的客户端连接
*/
private volatile static List<WebSocketSession> sessions = Collections.synchronizedList(new ArrayList());
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
sessions.remove(session);
if (session.isOpen()){
session.close();
}
count = count-1;
System.out.println(count);
}
/**
* 建立连接后的操作
* @param session
* @throws Exception
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
count++;
sessions.add(session);
}
/**
* 消息处理,在客户端通过Websocket API发送的消息会经过这里,然后进行相应的处理
* @param session
* @param message
* @throws Exception
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
String data = message.getPayload().toString();
System.out.println("客户端" + message.getPayload().toString());
if(!data.equals("B")) {
sessions.remove(session);
try {
for (WebSocketSession ws : sessions) {
if (ws.isOpen()) {
synchronized (ws) {
ws.sendMessage(new TextMessage("" + data));
System.out.println("服务端" + data);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 消息传输错误处理
* @param session
* @param throwable
* @throws Exception
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable throwable) throws Exception {
if(session.isOpen()){
sessions.remove(session);
session.close();
}
}
@Override
public boolean supportsPartialMessages() {
return false;
}
}
6.Java客户端代码
import javax.websocket.*;
@ClientEndpoint
public class Client{
@OnOpen
public void onOpen(Session session) {
System.out.println("Connected to endpoint: " + session.getBasicRemote());
}
@OnMessage
public void onMessage(String message) {
System.out.println(message);
}
@OnError
public void onError(Throwable t) {
t.printStackTrace();
}
}
7.静态代码块
import javax.websocket.ContainerProvider;
import javax.websocket.DeploymentException;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import java.io.IOException;
import java.net.URI;
public class TestClient {
public static Session session = null;
static{
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
String uri ="ws://localhost:8080/websocket/demo";
System.out.println("Connecting to 2"+ uri);
try {
session = container.connectToServer(Client.class,URI.create(uri));
} catch (DeploymentException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
代码经过测试,可用~
8.接口中调用
TestClient.session.getBasicRemote().sendText("hello");
说明:以上3.4.5为服务端代码,6.7.8为客户端代码
更多推荐
所有评论(0)