学习链接

WebSocket协议:5分钟从入门到精通https://www.cnblogs.com/chyingp/p/websocket-deep-in.html

WebSocket协议入门介绍 https://www.cnblogs.com/nuccch/p/10947256.html

java的websocket客户端和服务端库 https://github.com/TooTallNate/Java-WebSocket

对于Tomcat中的websocket的了解 https://blog.csdn.net/zjgyjd/article/details/99686939

基于Tomcat的webSocket的使用方式和源码分析 https://blog.csdn.net/coder_what/article/details/109032940

websocket之三:Tomcat的WebSocket实现 https://www.cnblogs.com/duanxz/p/5041110.html

WebSocket通信原理和在Tomcat中实现源码详解(万字爆肝) https://stefan.blog.csdn.net/article/details/120025498

Springboot使用websocket服务端+客户端(断线重连)https://blog.csdn.net/m0_43449433/article/details/124404374

Nginx配置WebSocket https://blog.csdn.net/qq_43907505/article/details/127694761

Gateway集成WebSocket 实现前后端通信(全) https://blog.csdn.net/YonJarLuo/article/details/117957845

spring-websocket+Java客户端的实现

spring整合websocket逻辑详细图解

在这里插入图片描述

协议过程简介

在这里插入图片描述

1、客户端:申请协议升级

首先,客户端发起协议升级请求。可以看到,采用的是标准的HTTP报文格式,且只支持GET方法。

GET / HTTP/1.1
Host: localhost:8080
Origin: http://127.0.0.1:3000
Connection: Upgrade
Upgrade: websocket
Sec-WebSocket-Version: 13
Sec-WebSocket-Key: w4v7O6xFTi36lq3RNcgctw==

重点请求首部意义如下:

Connection: Upgrade:表示要升级协议

Upgrade: websocket:表示要升级到websocket协议。

Sec-WebSocket-Version: 13:表示websocket的版本。如果服务端不支持该版本,需要返回一个Sec-WebSocket-Versionheader,里面包含服务端支持的版本号。

Sec-WebSocket-Key:与后面服务端响应首部的Sec-WebSocket-Accept是配套的,提供基本的防护,比如恶意的连接,或者无意的连接。

注意,上面请求省略了部分非重点请求首部。由于是标准的HTTP请求,类似Host、Origin、Cookie等请求首部会照常发送。在握手阶段,可以通过相关请求首部进行 安全限制、权限校验等。

2、服务端:响应协议升级

服务端返回内容如下,状态代码101表示协议切换。到此完成协议升级,后续的数据交互都按照新的协议来。

HTTP/1.1 101 Switching Protocols
Connection:Upgrade
Upgrade: websocket
Sec-WebSocket-Accept: Oy4NRAQ13jhfONC7bP8dTKb4PTU=

备注:每个header都以\r\n结尾,并且最后一行加上一个额外的空行\r\n。此外,服务端回应的HTTP状态码只能在握手阶段使用。过了握手阶段后,就只能采用特定的错误码。

3、数据帧交互

客户端、服务端数据的交换,离不开数据帧格式的定义。因此,在实际讲解数据交换之前,我们先来看下WebSocket的数据帧格式。

WebSocket客户端、服务端通信的最小单位是帧(frame),由1个或多个帧组成一条完整的消息(message)。

发送端:将消息切割成多个帧,并发送给服务端;
接收端:接收消息帧,并将关联的帧重新组装成完整的消息;

4、连接保持+心跳

WebSocket为了保持客户端、服务端的实时双向通信,需要确保客户端、服务端之间的TCP通道保持连接没有断开。然而,对于长时间没有数据往来的连接,如果依旧长时间保持着,可能会浪费包括的连接资源。

但不排除有些场景,客户端、服务端虽然长时间没有数据往来,但仍需要保持连接。这个时候,可以采用心跳来实现。

发送方->接收方:ping
接收方->发送方:pong
ping、pong的操作,对应的是WebSocket的两个控制帧,opcode分别是0x9、0xA。

举例,WebSocket服务端向客户端发送ping,只需要如下代码(采用ws模块)

ws.ping('', false, true);

最简单的demo

引入Java-WebSocket库

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zzhua</groupId>
    <artifactId>demo-base-websocket</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <dependency>
            <groupId>org.java-websocket</groupId>
            <artifactId>Java-WebSocket</artifactId>
            <version>1.5.2</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

    </dependencies>


</project>

Java ws服务端

package com.zzhua;

import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.handshake.ServerHandshake;
import org.java_websocket.server.WebSocketServer;
import org.junit.Test;

import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.locks.LockSupport;

public class Test001 {
    @Test
    public void test01() {

        WebSocketServer webSocketServer = new WebSocketServer(new InetSocketAddress(8080)) {

            public void onOpen(WebSocket conn, ClientHandshake handshake) {
                System.out.println("onOpen");
            }

            public void onClose(WebSocket conn, int code, String reason, boolean remote) {
                System.out.println("onClose");
            }

            public void onMessage(WebSocket conn, String message) {
                System.out.println("onMessage");
            }

            public void onError(WebSocket conn, Exception ex) {
                System.out.println("onError");
            }

            public void onStart() {
                System.out.println("onStart");
            }
        };

        webSocketServer.run();

    }
 
}

Java ws客户端

package com.zzhua;

import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.handshake.ServerHandshake;
import org.java_websocket.server.WebSocketServer;
import org.junit.Test;

import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.locks.LockSupport;

public class Test001 {

    @Test
    public void test02() throws URISyntaxException, InterruptedException {

        WebSocketClient webSocketClient = new WebSocketClient(new URI("ws://localhost:8080")) {
            public void onOpen(ServerHandshake handshakedata) {
                System.out.println("client onOpen");
            }

            public void onMessage(String message) {
                System.out.println("client onMessage");
            }

            public void onClose(int code, String reason, boolean remote) {
                System.out.println("client onClose");
            }

            public void onError(Exception ex) {
                System.out.println("client onError");
            }
        };

        webSocketClient.connect();
        Thread.sleep(5000);
        LockSupport.park();

    }

}

tomcat下使用websocket

在这里插入图片描述

引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zzhua</groupId>
    <artifactId>demo-tomcat-websocket</artifactId>
    <version>1.0-SNAPSHOT</version>

    <packaging>war</packaging>

    <properties>
        <jsp.version>2.1</jsp.version>
        <servlet.version>3.1.0</servlet.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>javax.servlet.jsp</groupId>
            <artifactId>jsp-api</artifactId>
            <version>${jsp.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- javax.servlet-api -->
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <version>${servlet.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/javax.websocket/javax.websocket-api -->
        <dependency>
            <groupId>javax.websocket</groupId>
            <artifactId>javax.websocket-api</artifactId>
            <version>1.1</version>
            <scope>provided</scope>
        </dependency>



    </dependencies>

    <build>
        <finalName>0520carrent</finalName>

        <plugins>
            <!-- 配置tomcat的运行插件 -->
            <plugin>
                <groupId>org.apache.tomcat.maven</groupId>
                <artifactId>tomcat7-maven-plugin</artifactId>
                <version>2.2</version>
                <configuration>
                    <!-- 配置端口 -->
                     <port>8080</port>
                    <!-- 配置urlencoding -->
                     <uriEncoding>UTF-8</uriEncoding>
                    <path>/</path>
                </configuration>
            </plugin>

            <!-- 配置jdk的编译版本 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <!-- 指定source和target的版本 -->
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

web.xml

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
                             http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
         version="3.1">

</web-app>

MyServlet

@WebServlet(urlPatterns = "/my")
public class MyServlet extends HttpServlet {

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    	for (MyWebSocketEndpoint myWebSocketEndpoint : MyWebSocketEndpoint.getConnections()) {
            myWebSocketEndpoint.getSession().getBasicRemote().sendText("halo");
        }
        resp.getWriter().write("hello world");
    }
}

MyWebSocketEndpoint

package com.zzhua.ws;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;

@ServerEndpoint(value="/websocket")
public class MyWebSocketEndpoint {

	private static final AtomicInteger counter = new AtomicInteger(0);                                    // 客户端计数器
	private static final Set<MyWebSocketEndpoint> connections = new CopyOnWriteArraySet<MyWebSocketEndpoint>();       // 客户端websocket连接集合
	private Session session = null;                                                                       // WebSocket会话对象
	private Integer number = 0;                                                                           // 客户端编号

	public MyWebSocketEndpoint() {
		number = counter.incrementAndGet();
	}

	/**
	 * 客户端建立websocket连接
	 * @param session
	 */
	@OnOpen
	public void start(Session session) {
		System.out.println("on open");
		this.session = session;
		connections.add(this);
		try {
			session.getBasicRemote().sendText(new StringBuffer().append("Hello: ").append(number).toString());
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 客户端断开websocket连接
	 */
	@OnClose
	public void close() {
		System.out.println("session close");
		try {
			this.session.close();
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			connections.remove(this);
		}
	}

	/**
	 * 接收客户端发送的消息
	 * @param message
	 */
	@OnMessage
	public void message(String message) {
		System.out.println("message: "+ message);
		for(MyWebSocketEndpoint client : connections) {
			synchronized (client) {
				try {
					client.session.getBasicRemote().sendText(message);
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}

	@OnError
	public void error(Throwable t) {
		System.out.println("client: error"+ number+t.getMessage());
	}
	
	public static Set<MyWebSocketEndpoint> getConnections() {
        return connections;
    }

    public Session getSession() {
        return session;
    }
}

html的websocket客户端

客户端主要事件和方法

  • new WebSocket 新建websocket连接
  • onopen 监听t连接打开事件
  • onmessage 有消息时触发
  • onclose 连接关闭t时触发
  • onerror 连接出错时触发
  • send 发送消息
  • close 关闭连接
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
    请输入: <input id="content" type="text"/>
    <button onclick="sendMsg()">发送</button>
    <button onclick="closeWebsocket()">关闭</button>

    <ul id="msgList">

    </ul>

</body>
<script>

    /* 参考: https://developer.mozilla.org/zh-CN/docs/Web/API/WebSocket */

    // var socket = new WebSocket('ws://localhost:8080/websocket01');
    var socket = new WebSocket('ws://localhost:8080/websocket/user001');
    // var socket = new WebSocket('ws://[[${ip}]]:8080/websocket01');

    // 指定连接成功后的回调
    socket.onopen = function (event) {
        console.log("建立连接成功")
    }

    // 发送消息给服务器
    function sendMsg() {
        var content = document.querySelector('#content');
        socket.send(content.value) // 使用websocket发送消息到服务器
    }


    // 收到服务器的消息时的回调
    socket.onmessage = function (ev) {
        console.log("收到服务器消息: " + JSON.stringify(ev))
        var ul = document.querySelector('#msgList');
        var li = document.createElement('li');
        li.innerText = ev.data
        ul.appendChild(li)
    }

    // 手动关闭websocket
    function closeWebsocket() {
        socket.close()
    }

    // 指定连接关闭后的回调
    socket.onclose = function (event) {
        console.log("连接关闭")
    }

</script>
</html>

Springboot整合websocket-1(@ServerEndpoint)

引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>com.zzhua</groupId>
    <artifactId>demo-springboot</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo-springboot</name>

    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

WsConfig

@Configuration
public class WsConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}

MyWebSocketEndpoint

package com.zzhua.ws;

import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;

/*
 1. @ServerEndpoint用于标记被修饰的类是一个 websocket服务器 的一个端点,且被标注的类必须有一个public的无参构造方法
 2. @ServerEndpoint可以指定 端点发布的路径, 和其它重要的属性,如encoders编码器、decoders解码器
 3. 端点发布的路径可以带路径变量,并且该路径变量名可以在@OnOpen和@OnClose标注的方法的方法参数中使用
    如:发布路径是@ServerEndpoint(value="/websocket/{userId}"),则在@OnOpen修饰的方法中可以使用@PathParam("userId"),
 4. @ServerEndpoint允许指定自定义配置器 ServerEndpointConfig.Configurator
 */
@Component
@ServerEndpoint(value="/websocket/{userId}/{username}")
public class MyWebSocketEndpoint {

	private static final AtomicInteger counter = new AtomicInteger(0);                                    // 客户端计数器
	private static final Set<MyWebSocketEndpoint> connections = new CopyOnWriteArraySet<MyWebSocketEndpoint>();       // 客户端websocket连接集合
	private Session session = null;                                                                       // WebSocket会话对象
	private Integer number = 0;                                                                           // 客户端编号

	public MyWebSocketEndpoint() {
		number = counter.incrementAndGet();
	}

	/**
	 * 客户端建立websocket连接
	 * @param session
	 */
	@OnOpen
	public void start(Session session, @PathParam("userId") String userId, @PathParam("username") String username) {
		System.out.println("on open");
		this.session = session;
		connections.add(this);
		try {
			session.getBasicRemote().sendText(new StringBuffer().append("Hello: ").append(number).toString());
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 客户端断开websocket连接
	 */
	@OnClose
	public void close(@PathParam("userId") String userId) {
		System.out.println("session close");
		try {
			this.session.close();
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			connections.remove(this);
		}
	}

	/**
	 * 接收客户端发送的消息
	 * @param message
	 */
	@OnMessage
	public void message(String message,@PathParam("userId") String userId,Session session) {
		System.out.println("message: "+ message);
		for(MyWebSocketEndpoint client : connections) {
			synchronized (client) {
				try {
					client.session.getBasicRemote().sendText(message);
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}

	@OnError
	public void error(Throwable t) {
		System.out.println("client: error"+ number+t.getMessage());
	}

    public static Set<MyWebSocketEndpoint> getConnections() {
        return connections;
    }

    public Session getSession() {
        return session;
    }
}

html的websocket客户端

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
请输入: <input id="content" type="text"/>
<button onclick="sendMsg()">发送</button>
<button onclick="closeWebsocket()">关闭</button>

<ul id="msgList">

</ul>

</body>
<script>

    /* 参考: https://developer.mozilla.org/zh-CN/docs/Web/API/WebSocket */

    // var socket = new WebSocket('ws://localhost:8080/websocket01');
    var socket = new WebSocket('ws://localhost:8080/websocket/user001/username001');
    // var socket = new WebSocket('ws://[[${ip}]]:8080/websocket01');

    // 指定连接成功后的回调
    socket.onopen = function (event) {
        console.log("建立连接成功")
    }

    // 发送消息给服务器
    function sendMsg() {
        var content = document.querySelector('#content');
        socket.send(content.value) // 使用websocket发送消息到服务器
    }


    // 收到服务器的消息时的回调
    socket.onmessage = function (ev) {
        console.log("收到服务器消息: " + JSON.stringify(ev))
        var ul = document.querySelector('#msgList');
        var li = document.createElement('li');
        li.innerText = ev.data
        ul.appendChild(li)
    }

    // 手动关闭websocket
    function closeWebsocket() {
        socket.close()
    }

    // 指定连接关闭后的回调
    socket.onclose = function (event) {
        console.log("连接关闭")
    }

</script>
</html>

这样建立websocket连接是比较容易,但是不能所有客户端都能连接服务端吧,比如只有登录了,才能成功建立websocket连接,这样才比较合理。如果采用这种整合方式,就必须知道它建立ws连接的过程。它是依靠WsFilter过滤器拦截所有请求,判断是不是切换ws协议,如果是,则升级协议,完成握手。所以,
1、要么我们重写WsFilter的doFilter,先进行登录状态验证,再调用父类。
2、要么在WsFilter前,再插入一个过滤器,验证通过了,再交给WsFilter处理,这样WsFilter它就不用操心认证的事了。
3、要么配置一个自定义的ServerEndpointConfig.Configurator,重写里面的modifyHandshake方法,这里面是可以拿到请求相关的信息的。但是不方便的是,这个自定义配置器对象,是通过反射创建的,并不是由spring管理的,虽然我们可以通过拿到各种手段拿到spring容器的方式实现解耦,这是能想到的最简单的方式了。

Springboot整合websocket-2(@EnableWebsocket)

看这个具体的实现之前,肯定是需要先看懂上面的实现的具体过程的,也就是说,上面相当于是使用原生的tomcat,只不过springboo帮我们注册端点类。而这里,spring抽象出了处理websocket的接口,屏蔽了各种web容器间的实现差异,里面也用到了很多的装饰者模式,不得不佩服spring的抽象能力哇。难怪之前我看的懵逼,会想:为什么有的时候上面这样写,有的时候下面那样写?到底谁优谁劣?它们实现一不一样?这2种写法能不能一起使用?谁优谁劣,一眼就可以看出来了,当然是下面这种更好啦,它又回归到我们熟悉的spring容器咯~

而且这里面实现也是参考了@EnableWebMvc一样的配置原理,并且处理过程也借鉴了springmvc的处理流程,它的握手流程也是通过springmvc的HttpRequestHandler类型的处理器(WebSocketHttpRequestHandler)介入交给握手处理器AbstractHandshakeHandler,握手处理器里面通过协议升级策略方式来屏蔽不同web容器的差异。

源码初探

在查看源码的时候,上面这种方式和下面这种方式,握手后的协议升级最终都会到UpgradeUtil#doUpgrade这个方法,这个比较关键,在这个doUpgrade方法里,会调用HttpServletRequest#upgrade(WsHttpUpgradeHandler.class)创建一个WsHttpUpgradeHandler对象(在协议升级后,接收到消息时,也是用这个对象处理的),
源码还可以留意到:AbstractProtocol$ConnectionHandler(静态内部类)#process方法和connections属性。在process里面,由“不变的processor”处理请求,如果是协议升级请求,则处理后会返回UPGRADING状态,这其中就包含了协议升级过程,processor中可以得到UpgradeToken,然后创建出一个新的processor放入connections中,再从UpgradeTken中获取httpUpgradeHandler初始化这个新的processor。由于connections中的key就是socket,当socket中有数据来了,就可以找到对应的processor,交给此processor处理了。并且 我们还注意这里面是有个while循环的,跳出的条件是返回的状态不为SocketState.UPGRADING,所以此时新的processor会立马执行一遍processor.process(wrapper, status),相当于前面这一步是在握手,握手之后的这一步是在通知连接建立成功,debug的时候,注意这一点就好了

引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>com.zzhua</groupId>
    <artifactId>demo-springboot</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo-springboot</name>

    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

WebsocketConfig

@Configuration
@EnableWebSocket
public class WebsocketConfig implements WebSocketConfigurer {

    public static Map<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>();

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry
                .addHandler(new TextWebSocketHandler() { // WebSocketHandler接口
                    @Override
                    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
                        System.out.println("建立websocket连接后的回调," + session + "," + this + RequestContextHolder.getRequestAttributes());
                        // 存入sessionMap,实际应该关联到用户
                        String name = (String) session.getAttributes().get("name");
                        System.out.println("与: {}建立链接成功" + name);
                        sessionMap.put(name, session);
                    }

                    @Override
                    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
                        System.out.println("收到websocket消息," + session + "," + message+ RequestContextHolder.getRequestAttributes());
                    }

                    @Override
                    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
                        System.out.println("websocket连接关闭后回调" + "," + status + RequestContextHolder.getRequestAttributes());
                    }

                    @Override
                    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
                        System.out.println("连接发生错误被回调");
                    }
                }, "/websocket01")
                .addInterceptors(new HandshakeInterceptor() { // HandshakeInterceptor接口
                    @Override // 这里回传的ServerHttpRequest(实现类为ServletServerHttpRequest)包裹了HttpServletRequest
                    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
                        System.out.println("握手前的回调," + wsHandler + ","+ attributes + "," + RequestContextHolder.getRequestAttributes());
                        // 也可以从session中获取用户信息(也可以使用请求头放token,只要能获取到用户都ok)
                        String name = ((ServletServerHttpRequest) request).getServletRequest().getParameter("name");
                        System.out.println("握手拦截器,获取name: {}"+name);
                        synchronized (this) {
                            if (!sessionMap.containsKey(name)) {
                                attributes.put("name", name);
                                System.out.println("sessionMap中不包含此name:{},允许建立链接"+name);
                                return true;
                            }
                            System.out.println("sessionMap中已包含此name:{}"+name);
                            return false;
                        }
                    }

                    @Override
                    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
                        System.out.println("握手后回调, " + "," + wsHandler + "," + exception + RequestContextHolder.getRequestAttributes());
                    }
                })
                .setHandshakeHandler(new MyHandShakeHandler()) // HandshakeHandler
                .setAllowedOrigins("*")
                // 开启sockJs,记得把websocketEnabled也给开启了,否则客户端只能使用sockJs,而不能使用websocket
                // .withSockJS()
                // .setWebSocketEnabled(true)
        ;
    }

    static class MyHandShakeHandler implements HandshakeHandler {
        // 借助Spring提供的DefaultHandshakeHandler完成握手(仅仅包裹了一下spring封装的DefaultShakeHandler)
        private DefaultHandshakeHandler defaultHandshakeHandler = new DefaultHandshakeHandler();

        @Override
        public boolean doHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws HandshakeFailureException {
            System.out.println("开始握手..." + RequestContextHolder.getRequestAttributes());
            boolean flag = defaultHandshakeHandler.doHandshake(request, response, wsHandler, attributes);
            System.out.println("握手完成..." + flag);
            return flag;
        }
    }

}

html的websocket客户端

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
请输入: <input id="content" type="text"/>
<button onclick="sendMsg()">发送</button>
<button onclick="closeWebsocket()">关闭</button>

<ul id="msgList">

</ul>

</body>
<script>

    /* 参考: https://developer.mozilla.org/zh-CN/docs/Web/API/WebSocket */

    // var socket = new WebSocket('ws://localhost:8080/websocket01');
    // var socket = new WebSocket('ws://localhost:8080/websocket/user001/username001');
    var socket = new WebSocket('ws://localhost:8080/websocket01?name=zzhua');
    // var socket = new WebSocket('ws://[[${ip}]]:8080/websocket01');

    // 指定连接成功后的回调
    socket.onopen = function (event) {
        console.log("建立连接成功")
    }

    // 发送消息给服务器
    function sendMsg() {
        var content = document.querySelector('#content');
        socket.send(content.value) // 使用websocket发送消息到服务器
    }


    // 收到服务器的消息时的回调
    socket.onmessage = function (ev) {
        console.log("收到服务器消息: " + JSON.stringify(ev))
        var ul = document.querySelector('#msgList');
        var li = document.createElement('li');
        li.innerText = ev.data
        ul.appendChild(li)
    }

    // 手动关闭websocket
    function closeWebsocket() {
        socket.close()
    }

    // 指定连接关闭后的回调
    socket.onclose = function (event) {
        console.log("连接关闭")
    }

</script>
</html>

实际应用时的注意点

这一块已经梳理好了,可以直接参考这篇:Spring整合tomcat的WebSocket详细逻辑(图解)

携带id或token

websocket不可能让所有的客户端都能随随便便就能连上,需要携带token或者id

  • 直接带在url的query上
  • 带在请求头上(利用WebSocket的构造参数的第二个参数-子协议(视为token)-可参考:学习风`宇blog的websocket模块
  • cookie

心跳检测

我们需要定期向websocket发送消息,看有没有回信(每隔半分钟,向服务器发送一个消息)

定时重连

如果链接出错或者中断,我们需要重连,并且也不能无限制的重连(达到最大重连次数后,给出错误提示)

nginx配置WebSocket

Nginx配置WebSocket

server{
	   # 监听的端口号
       listen      9095;
       server_name robotchat.lukeewin.top; # 这里填写的是访问的域名
       location / {
           proxy_pass http://127.0.0.1:9090; # 这里填写的是代理的路径和端口
           proxy_set_header Host $host;
           proxy_set_header X-Real_IP $remote_addr;
           proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
       }
       
       # 以下配置针对websocket
       location /ws { # onlineCount为websocket的访问uri
           proxy_redirect off;
           proxy_pass http://127.0.0.1:9090;
           proxy_set_header Host $host;
           proxy_set_header X-Real_IP $remote_addr;
           proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
           proxy_http_version 1.1;
           proxy_read_timeout 36000s;
           proxy_send_timeout 36000s;
           proxy_set_header Upgrade $http_upgrade;   # 升级协议头 websocket
           proxy_set_header Connection "upgrade";
       }
}

如果没写如下语句,则会报EOFException

proxy_read_timeout 36000s;
proxy_send_timeout 36000s;

在这里插入图片描述
添加如下三行语句,才能在后台中拿到真实的ip地址。

proxy_set_header Host $host;
proxy_set_header X-Real_IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

获取真实ip地址代码如下。

public String getRealIp(HttpServletRequest request) {
        String ip = request.getHeader("X-Forwarded-For");
        if (StringUtils.isNotEmpty(ip) && !"unKnown".equalsIgnoreCase(ip)) {
            int index = ip.indexOf(",");
            if (index != -1) {
                return ip.substring(0, index);
            } else {
                return ip;
            }
        }
        ip = request.getHeader("X-Real-IP");
        if (StringUtils.isNotEmpty(ip) && !"unKnown".equalsIgnoreCase(ip)) {
            return ip;
        }
        return request.getRemoteAddr();
}

spring-websocket+Java客户端的实现

1.业务场景描述

这个案例主要是用来监控用的,就是把接口中的数据实时的展示到页面上,比如一个下订单的接口,在外界访问订单接口时,订单存入到数据库后需要将订单数据展示到页面上。因此就可以在接口中使用Java客户端将订单信息发送到websocket服务器,然后页面作为客户端把接口订单数据暂时到页面上。在接口中将Java客户端的连接传入接口中,直接通过连接发送订单信息到服务器。连接websocket的客户端代码通过静态代码块的方式加载,然后将建立连接后的websocket连接存到静态变量,在接口中通过静态变量,就可以获取websocket连接,进而达到在接口中发送订单信息的目的。

2.所需jar包

服务端:spring-websocket,spring-messaging

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-websocket</artifactId>
    <version>${spring.version}</version>
</dependency>

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-messaging</artifactId>
    <version>${spring.version}</version>
</dependency>

客户端:javax.websocket-api,tyrus-standalone-client

<dependency>
    <groupId>javax.websocket</groupId>
    <artifactId>javax.websocket-api</artifactId>
    <version>1.1</version>
</dependency>

<dependency>
    <groupId>org.glassfish.tyrus.bundles</groupId>
    <artifactId>tyrus-standalone-client</artifactId>
    <version>1.9</version>
</dependency>

3.websocket配置文件

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
 
/**
 * websocket配置文件
 * 有了该配置文件,就不用在spring-mvc.xml中进行websocket的配置
 * EnableWebSocket注解 :开启websocket服务
 * @author cm
 */
@Configuration
@EnableWebMvc
@EnableWebSocket
public class WebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer {
 
    /**
     *添加处理器和拦截器,处理器后面的地址就是websocket的访问路径
     * setAllowedOrigins:指定的域名或IP,如果不限制使用"*"就可以了
     * @param registry
     */
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myWebSocketHandler(), "/websocket/demo")
                .addInterceptors(myInterceptor()).setAllowedOrigins("*");
 
    }
 
    /**
     * 直接注入自己定义的websocket处理器
     * @return
     */
    @Bean
    public WebSocketHandler myWebSocketHandler(){
        return new MyWebSocketHandler();
    }
 
    /**
     * 直接注入自己定义的websocket拦截器
     * @return
     */
    @Bean
    public WebSocketHandshakeInterceptor myInterceptor(){
        return new WebSocketHandshakeInterceptor();
    }
}

4.websocket拦截器


import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
 
import java.util.Map;
 
/**
 * websocket拦截器:一般情况下不做处理
 */
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {
 
    @Override
    public void afterHandshake(ServerHttpRequest arg0, ServerHttpResponse arg1, WebSocketHandler arg2, Exception arg3) {
 
    }
 
    @Override
    public boolean beforeHandshake(ServerHttpRequest arg0, ServerHttpResponse arg1, WebSocketHandler arg2,
                                   Map<String, Object> arg3) throws Exception {
        return true;
    }
 
}

5.websocket处理器

import org.springframework.scheduling.annotation.Async;
import org.springframework.web.socket.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
 
/**
 * websocket处理器:功能实现的核心代码编写类
 * @author cm
 */
public class MyWebSocketHandler implements WebSocketHandler{
 
    /**
     * 定义一个全局的初始化值count=0,记录连接数
     */
    private static int count = 0;
 
    /**
     * 记录所有的客户端连接
     */
 
    private volatile static List<WebSocketSession> sessions = Collections.synchronizedList(new ArrayList());
 
    
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        sessions.remove(session);
        if (session.isOpen()){
            session.close();
        }
        count = count-1;
        System.out.println(count);
    }
 
    /**
     * 建立连接后的操作
     * @param session
     * @throws Exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        count++;
        sessions.add(session);
 
    }
 
    /**
     * 消息处理,在客户端通过Websocket API发送的消息会经过这里,然后进行相应的处理
     * @param session
     * @param message
     * @throws Exception
     */
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        String data = message.getPayload().toString();
        System.out.println("客户端" + message.getPayload().toString());
        if(!data.equals("B")) {
            sessions.remove(session);
            try {
                 for (WebSocketSession ws : sessions) {
                     if (ws.isOpen()) {
                         synchronized (ws) {
                            ws.sendMessage(new TextMessage("" + data));
                            System.out.println("服务端" + data);
                         }
 
                      }
                 }
            } catch (IOException e) {
                e.printStackTrace();
             
            }
        }
    }
 
    /**
     * 消息传输错误处理
     * @param session
     * @param throwable
     * @throws Exception
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable throwable) throws Exception {
        if(session.isOpen()){
            sessions.remove(session);
            session.close();
        }
    }
 
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }
 
}

6.Java客户端代码​

import javax.websocket.*;
 
@ClientEndpoint
public class Client{
 
    @OnOpen
    public void onOpen(Session session) {
        System.out.println("Connected to endpoint: " + session.getBasicRemote());
    }
 
    @OnMessage
    public void onMessage(String message) {
        System.out.println(message);
    }
 
    @OnError
    public void onError(Throwable t) {
        t.printStackTrace();
    }
 
}

7.静态代码块

import javax.websocket.ContainerProvider;
import javax.websocket.DeploymentException;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import java.io.IOException;
import java.net.URI;
 
 
 
public class TestClient {
 
    public static Session session = null;
 
    static{
        WebSocketContainer container = ContainerProvider.getWebSocketContainer();
        String uri ="ws://localhost:8080/websocket/demo";
        System.out.println("Connecting to 2"+ uri);
        try {
            session = container.connectToServer(Client.class,URI.create(uri));
        } catch (DeploymentException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

代码经过测试,可用~

8.接口中调用

TestClient.session.getBasicRemote().sendText("hello");

说明:以上3.4.5为服务端代码,6.7.8为客户端代码

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐