Spring Cloud Gateway 管理 Netty : https://editor.csdn.net/md/?articleId=105424618
上篇文章中简单概述了一下网关路由长连接的思路
这篇文章来贴一下代码

  1. Java 通过 SocketClient 连接 Netty
  2. VUE 通过 WebSocket 连接 Netty

提要:

本文章中有关 Netty 和 Client 以及 Socket 的代码仅作为网关路由长连接案例,非网关路由请查看直连文档,以免出现无法预想的问题。

首先是 Netty - WebSocket 服务端的搭建
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import java.net.InetSocketAddress;

/**
 * Netty 服务端
 * @author cuixiaojian
 */
@Slf4j
@Configuration
public class NettyServer {

	/**
     * Netty 端口号
     */
    @Value("${netty.port}")
    private Integer nettyPort;

    /**
     * 启动方法
     * @throws  Exception
     */
    public void startServer() throws Exception {

        // Netty 线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup wokerGroup = new NioEventLoopGroup();

        try{
            // Netty 启动类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,wokerGroup).channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .option(ChannelOption.SO_KEEPALIVE,true)
                    .option(ChannelOption.SO_BACKLOG,1024 * 1024 * 10)

                    // 设置处理器
                    .childHandler(new ChannelInitializer<SocketChannel>(){
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {

                            ChannelPipeline pipeline = socketChannel.pipeline();

                            pipeline.addLast("http-codec",new HttpServerCodec());
                            pipeline.addLast("http-chunked",new ChunkedWriteHandler());
                            pipeline.addLast("aggregator",new HttpObjectAggregator(1024*1024*1024));

                            pipeline.addLast(new WebSocketServerProtocolHandler("/notice",null,true,65535));

                            // 自定义处理器
                            pipeline.addLast(new WebSocketHandle());

                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(nettyPort)).sync();
            if (channelFuture.isSuccess()) {
                log.info("Netty服务端启动成功, 端口号:{}",nettyPort);
            }
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("Netty服务端启动失败,异常信息为: {}",e);
        } finally {
            bossGroup.shutdownGracefully();
            wokerGroup.shutdownGracefully();
        }

    }

}
自定义处理器
import com.xnyzc.lhy.common.util.CheckUtil;
import com.xnyzc.lhy.notice.entity.message.SysSocketUser;
import com.xnyzc.lhy.notice.netty.param.NettyMessage;
import com.xnyzc.lhy.notice.netty.util.NettyMsg;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;

import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * 自定义的handler类
 * @author cuixiaojian
 */
@Slf4j
@Configuration
public class WebSocketHandle extends SimpleChannelInboundHandler<Object> {

    //客户端组
    public static ChannelGroup channelGroup;

    static {
        channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    }

    /**
     * 存储 Channel 的容器
     */
    private static ConcurrentMap<String, Channel> channelMap = new ConcurrentHashMap<>();


    /**
     * Handler活跃状态,连接成功
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 将通道放入组
        channelGroup.add(ctx.channel());
    }

    /**
     * 通道读取
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

        // 文本消息 (约定 与客户端 Socket 消息类型为文本消息)
        if (msg instanceof TextWebSocketFrame) {
            // 获取当前channel绑定的IP地址
            InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
            String address = ipSocket.getAddress().getHostAddress();
            log.info("获取到远程连接:IP:{}", address);

            TextWebSocketFrame textFrame = (TextWebSocketFrame) msg;
            String message = textFrame.text();
            log.info("收到来自客户端消息: {}", message);

			// 此处 NettyMsg 类为自定义消息解析器,通过 message 串来解析消息。
            NettyMessage nettyMessage = NettyMsg.getNettyMessage(message);
            // 解析成为自定义消息对象
            SysSocketUser sysSocketUser = NettyMsg.getCon(nettyMessage.getCon(), SysSocketUser.class);

			// 与客户端约定建立连接成功时,发送一条包含用户ID的消息,此处从消息中获取用户ID与Channel绑定,后续为用户发送消息时使用。
            if (CheckUtil.objIsNotEmpty(sysSocketUser)) {
                if (CheckUtil.objIsNotEmpty(sysSocketUser.getUserId())) {
                    //将 用户 和 Channel 的关系保存
                    if (!channelMap.containsKey(sysSocketUser.getUserId())) {
                        channelMap.put(sysSocketUser.getUserId(), ctx.channel());
                    }
                }
            }

        }
        // PING 类型消息
        if (msg instanceof PongWebSocketFrame) {
            log.info("PING SUCCESS");
        }
        // 请求关闭连接类型消息
        if (msg instanceof CloseWebSocketFrame) {
            log.info("客户端关闭连接,服务端关闭通道");
            Channel channel = ctx.channel();
            channel.close();
        }
    }

    /**
     * 未注册状态
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("等待连接");
    }

    /**
     * 非活跃状态,没有连接远程主机的时候。
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("客户端关闭");
        channelGroup.remove(ctx.channel());
    }

    /**
     * 异常处理
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("连接异常:" + cause.getMessage());
        ctx.close();
    }

    /**
     * 给指定用户发内容
     * 可以使用此方法推送消息给客户端
     */
    public void sendMessage(String userId, String message) {
        Channel channel = channelMap.get(userId);
        channel.writeAndFlush(new TextWebSocketFrame(message));
    }

    /**
     * 群发消息
     */
    public void sendMessageAll(String message) {
        channelGroup.writeAndFlush(new TextWebSocketFrame(message));
    }
}
Spring Boot 启动后一并启动 Netty 服务
public class NoticeApplication {

    public static void main(String[] args) {
        ApplicationContext context = SpringApplication.run(NoticeApplication.class, args);
        NettyServer nettyServer = context.getBean(NettyServer.class);
        try {
            nettyServer.startServer();
        } catch (Exception e) {
            System.out.println("netty 启动失败");
        }
    }

}
其次是客户端与Gateway建立连接
1. Java 客户端连接
1. 客户端服务
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

import java.net.URI;

/**
 * Netty - WebSocket 客户端
 * @author cuixiaojian
 */
@Slf4j
public class NettyClient {

    public static void main(String[] args) throws Exception {

        // Netty 线程组
        EventLoopGroup group = new NioEventLoopGroup();

        // Netty 启动类
        Bootstrap boot = new Bootstrap();

        boot.option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .group(group)
                .handler(new LoggingHandler(LogLevel.INFO))
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast("http-codec",new HttpClientCodec());
                        pipeline.addLast("aggregator",new HttpObjectAggregator(1024*1024*10));
                        pipeline.addLast("hookedHandler", new WebSocketClientHandler());
                    }
                });

		// 这里的路径即为网关中配置的路由路径
        URI webSocketURI = new URI("ws://localhost:30000/nio/v1");
        HttpHeaders httpHeaders = new DefaultHttpHeaders();

        // 握手
        WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(webSocketURI, WebSocketVersion.V13, (String) null, true, httpHeaders);

        // 连接通道
        final Channel channel = boot.connect(webSocketURI.getHost(), webSocketURI.getPort()).sync().channel();
        WebSocketClientHandler handler = (WebSocketClientHandler) channel.pipeline().get("hookedHandler");

        handler.setHandshaker(handshaker);
        handshaker.handshake(channel);

        // 阻塞等待是否握手成功
        handler.handshakeFuture().sync();

        // 给服务端发送的内容,如果客户端与服务端连接成功后,可以多次掉用这个方法发送消息
        sendMessage(channel);
    }

    private static void sendMessage(Channel channel){
        //发送的内容,是一个文本格式的内容

		// 此处NettyMsg为自定义消息解析器,setJsonMsg为设置一条Json串消息
        String putMessage = NettyMsg.setJsonMsg("NOTICE", "{\"userId\": \"123456\"}");

        TextWebSocketFrame frame = new TextWebSocketFrame(putMessage);
        channel.writeAndFlush(frame).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    System.out.println("消息发送成功,发送的消息是:" + putMessage);
                } else {
                    System.out.println("消息发送失败 " + channelFuture.cause().getMessage());
                }
            }
        });
    }

}
2. 客户端处理器
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;

/**
 * @author cuixiaojian
 */
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {

    /**
     * 握手的状态信息
     */
    private WebSocketClientHandshaker handshaker;

    /**
     * netty自带的异步处理
     */
    private ChannelPromise handshakeFuture;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("当前握手的状态"+this.handshaker.isHandshakeComplete());
        Channel ch = ctx.channel();
        FullHttpResponse response;
        // 进行握手操作
        if (!this.handshaker.isHandshakeComplete()) {
            try {
                response = (FullHttpResponse)msg;
                // 握手协议返回,设置结束握手
                this.handshaker.finishHandshake(ch, response);
                // 设置成功
                this.handshakeFuture.setSuccess();
                System.out.println("服务端的消息"+response.headers());
            } catch (WebSocketHandshakeException var7) {
                FullHttpResponse res = (FullHttpResponse)msg;
                String errorMsg = String.format("握手失败,status:%s,reason:%s", res.status(), res.content().toString(CharsetUtil.UTF_8));
                this.handshakeFuture.setFailure(new Exception(errorMsg));
            }
        } else if (msg instanceof FullHttpResponse) {
            response = (FullHttpResponse)msg;
            throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
        } else {

            // 接收服务端的消息
            WebSocketFrame frame = (WebSocketFrame)msg;
            // 文本信息
            if (frame instanceof TextWebSocketFrame) {
                TextWebSocketFrame textFrame = (TextWebSocketFrame)frame;
                System.out.println("客户端接收的消息是:"+textFrame.text());
            }
            // 二进制信息
            if (frame instanceof BinaryWebSocketFrame) {
                BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame)frame;
                System.out.println("BinaryWebSocketFrame");
            }
            // PING信息
            if (frame instanceof PongWebSocketFrame) {
                System.out.println("WebSocket Client received pong");
            }
            // 关闭消息
            if (frame instanceof CloseWebSocketFrame) {
                System.out.println("receive close frame");
                ch.close();
            }

        }
    }

    /**
     * Handler活跃状态,表示连接成功
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("与服务端连接成功");
    }

    /**
     * 非活跃状态,没有连接远程主机的时候。
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("主机关闭");
    }

    /**
     * 异常处理
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("连接异常:"+cause.getMessage());
        ctx.close();
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        this.handshakeFuture = ctx.newPromise();
    }

    public WebSocketClientHandshaker getHandshaker() {
        return handshaker;
    }

    void setHandshaker(WebSocketClientHandshaker handshaker) {
        this.handshaker = handshaker;
    }

    public ChannelPromise getHandshakeFuture() {
        return handshakeFuture;
    }

    public void setHandshakeFuture(ChannelPromise handshakeFuture) {
        this.handshakeFuture = handshakeFuture;
    }

    ChannelFuture handshakeFuture() {
        return this.handshakeFuture;
    }
2. VUE 客户端连接
	// 初始化weosocket
	initWebSocket() { 
	  // 设置网关路由地址
      const wsuri = 'ws://127.0.0.1:30000/nio/v1'
      this.websock = new WebSocket(wsuri)
      this.websock.onmessage = this.websocketonmessage
      this.websock.onopen = this.websocketonopen
      this.websock.onerror = this.websocketonerror
      this.websock.onclose = this.websocketclose
    },
    // 连接建立之后执行send方法发送数据
    websocketonopen() {
      const actions = { 'code': '0', 'method': 'NOTICE', 'msg': '', 'con': '{\'userId\':\'1\'}' }
      this.websocketsend(JSON.stringify(actions))
    },
    // 连接建立失败重连
    websocketonerror() { 
      // this.initWebSocket()
    },
    // 数据接收
    websocketonmessage(e) { 
      // 解析为Json消息
      // const redata = JSON.parse(e.data)
      // element-ui 通知组件
      this.$notify({
        title: '通知',
        message: '您有一条新消息'
      })
      // 自定义接收到消息后的处理
      this.getMessage()
    },
    // 数据发送
    websocketsend(Data) { 
      this.websock.send(Data)
    },
    // 关闭
    websocketclose(e) { 
      console.log('断开连接', e)
    }
在钩子中初始化 WebSocket
mounted() {
    this.initWebSocket()
  },
Logo

前往低代码交流专区

更多推荐