目标

现在微服务普遍流行,在对外连接上, Netty+Protobuf 通讯性能要优于 Http+Json方式,适合大数据高并发, 长连接异步通讯场景, 本教程主要讲解Spring Boot + Netty集成, 以及Netty+WebSocket+Protobuf的通讯配置。

脉络

  • Spring Boot 2.X + Netty集成配置
  • Spring Boot 2.X + Netty通讯测试
  • Spring Boot 2.X + Netty改造支持WebSocket
  • Spring Boot 2.X + Netty + WebSocket + Protobuf通讯测试
  • vue 客户端的调用

知行

1. Spring Boot 2.X + Netty集成配置

1.1 引入Netty包, 修改pom.xml依赖文件:

<!-- Netty 依赖组件 -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.36.Final</version>
</dependency>

1.2 Netty启动配置
在Spring Boot 启动完成之后, 再创建Netty服务。
在Spring Boot 服务启动类StockProxyApplication上, 增加ApplicationRunner的Run接口实现:

@SpringBootApplication
public class StockProxyApplication implements ApplicationRunner {

    @Value("${socket.netty.port:19999}")
    private int nettyPort;

    @Autowired
    private StockProxyServerInitializer stockProxyServerInitializer;
    
   /**
     * 启动Netty服务
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {

        // 创建网络服务器
        EventLoopGroup boss = new NioEventLoopGroup();
        // 创建Worker线程
        EventLoopGroup worker = new NioEventLoopGroup();

        try {
            // Netty服务启动类
            ServerBootstrap boot = new ServerBootstrap();
            // 采用NIO通道,自定义stockProxyServerInitializer初始化启动器
            boot.group(boss, worker).channel(NioServerSocketChannel.class)
            .childHandler(stockProxyServerInitializer);
            // 绑定端口, 同步阻塞监听
            ChannelFuture f = boot.bind(nettyPort).sync();
            // 开启channel监听器, 监听关闭动作
            f.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            log.error("Can't start Netty Server Process", e);
            return;
        } finally {
            // 采用优雅的关闭方式
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }

    }
}

1.3 Channel初始器配置
StockProxyServerInitializer作为自定义服务通道的初始化设置, 代码:

@Component
public class StockProxyServerInitializer extends ChannelInitializer<SocketChannel> {

    @Value("${socket.netty.debug:false}")
    private boolean debug;

    @Autowired
    private StockProxyServerHandler stockProxyServerHandler;


	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();

		// 可以打印出报文的请求和响应细节
        if(debug) {
            pipeline.addLast(new LoggingHandler());
        }
        //解码器,通过Google Protocol Buffers序列化框架动态的切割接收到的ByteBuf
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        //服务器端接收的是约定的StockMessage对象,所以这边将接收对象进行解码
        pipeline.addLast(new ProtobufDecoder(StockMessage.getDefaultInstance()));
        //Google Protocol Buffers编码器
        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        //Google Protocol Buffers编码器
        pipeline.addLast(new ProtobufEncoder());
        // 自定义数据接收处理器
        pipeline.addLast(stockProxyServerHandler);

	}

}

1.4 数据接收处理器
StockProxyServerHandler代码:

@Component
// 代表可以被多个channel线程安全共享
@Sharable
@Log4j2
public class StockProxyServerHandler extends SimpleChannelInboundHandler<StockMessage> {

	@Autowired
    private TaskExecutor taskExecutor;

    /**
     * 负责客户端Channel管理(线程安全)
     */
    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

 /**
     * 接收处理客户端发送数据
     * @param channelHandlerContext
     * @param stockMessage
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, StockMessage stockMessage) throws Exception {
		// 异步处理
        taskExecutor.execute(() ->{
            try {
            	// 异步线程处理业务逻辑
            	...
            }catch(Exception e){
            	...
            }
    }

	...
}

1.5 配置文件
bootstrap-dev.yml配置文件内容:

server:
  port: 10693
spring:
  application:
    name: stock-proxy

# Netty 服务配置 (自定义端口)
socket:
  netty:
    port: 19999
    debug: true
...

2. Spring Boot 2.X + Netty通讯测试

2.1 测试客户端
NettyClientTest代码:

@Log4j2
public class NettyClientTest {
    private static String serverIp = "127.0.0.1";
    private static int serverPort = 19999;

    public static void main(String[] args) throws Exception {
    	// 创建事件循环组
        EventLoopGroup group = new NioEventLoopGroup();
        // 创建启动服务
        Bootstrap bootstrap = new Bootstrap();
        // 注册管理
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline()
                        		//解码器,通过Google Protocol Buffers序列化框架动态的切割接收到的ByteBuf
                                .addLast(new ProtobufVarint32FrameDecoder())
                                // 按约定的ResponseMessage对象,进行解码
                                .addLast(new ProtobufDecoder(ResponseMessage.getDefaultInstance()))
                                //Google Protocol Buffers 长度属性编码器
                                .addLast(new ProtobufVarint32LengthFieldPrepender())
                                // Protocol Buffers 编码器
                                .addLast(new ProtobufEncoder())
                                // 自定义数据接收处理器
                                .addLast(new ClientBoundHandler(method));

                    }
                });
		// 建立连接
        ChannelFuture future = bootstrap.connect(serverIp, serverPort).sync();
        log.info("client is connected.");
        future.channel().closeFuture().sync();
        group.shutdownGracefully();
        log.info("client is closed!");
    }
}

2.2 自定义数据接收处理器
ClientBoundHandler代码:

@Log4j2
public class ClientBoundHandler extends SimpleChannelInboundHandler<ResponseMessage> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("========== Server Connected ===========");
        ...
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ResponseMessage msg) throws Exception {
       StockHeadProto.StockHead stockHead = msg.getStockHead();
       // 根据消息类型处理Protobuf消息
       ...
    }


}

2.3 启动客户端验证
发送登陆请求, 进行验证。
在这里插入图片描述
可以看到客户端与服务端可以成功收发消息, 具体Protobuf和业务逻辑实现代码就不贴出了, 主要掌握
Spring Boot + Netty+Protobuf的集成配置。

3. 改造支持WebSocket

上面已经集成了Netty服务, 但并不能直接支持Websocket方式连入, 我们就以上面的例子为基础, 讲下如何改造。

3.1 修改服务端的初始器配置
StockProxyServerInitializer代码:


public class StockProxyServerInitializer extends ChannelInitializer<SocketChannel> {

    @Value("${socket.netty.debug:false}")
    private boolean debug;

    @Autowired
    private StockProxyServerHandler stockProxyServerHandler;


	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();

		// 在调试期加入日志功能,从而可以打印出报文的请求和响应细节
        if(debug) {
            pipeline.addLast(new LoggingHandler());
        }
        pipeline.addLast(new HttpServerCodec());
        // 支持参数对象解析, 比如POST参数, 设置聚合内容的最大长度
        pipeline.addLast(new HttpObjectAggregator(65536));
        // 支持大数据流写入
        pipeline.addLast(new ChunkedWriteHandler());
        // 支持WebSocket数据压缩
        pipeline.addLast(new WebSocketServerCompressionHandler());
        // Websocket协议配置, 设置访问路径
        pipeline.addLast(new WebSocketServerProtocolHandler("/stock", null, true));

        //解码器,通过Google Protocol Buffers序列化框架动态的切割接收到的ByteBuf
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        //Google Protocol Buffers 长度属性编码器
        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());


        // 协议包解码
        pipeline.addLast(new MessageToMessageDecoder<WebSocketFrame>() {
            @Override
            protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> objs) throws Exception {
                log.info("received client msg ------------------------");
                if (frame instanceof TextWebSocketFrame) {
                	// 文本消息
                    TextWebSocketFrame textFrame = (TextWebSocketFrame)frame;
                    log.info("MsgType is TextWebSocketFrame");
                } else if (frame instanceof BinaryWebSocketFrame) {
                	// 二进制消息
                    ByteBuf buf = ((BinaryWebSocketFrame) frame).content();
                    objs.add(buf);
                    // 自旋累加
                    buf.retain();
                    log.info("MsgType is BinaryWebSocketFrame");
                } else if (frame instanceof PongWebSocketFrame) {
                	// PING存活检测消息
                    log.info("MsgType is PongWebSocketFrame ");
                } else if (frame instanceof CloseWebSocketFrame) {
                	// 关闭指令消息
                    log.info("MsgType is CloseWebSocketFrame");
                    ch.close();
                }

            }
        });
        // 协议包编码
        pipeline.addLast(new MessageToMessageEncoder<MessageLiteOrBuilder>() {
            @Override
            protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception {
                ByteBuf result = null;
                if (msg instanceof MessageLite) {
                	// 没有build的Protobuf消息
                    result = wrappedBuffer(((MessageLite) msg).toByteArray());
                }
                if (msg instanceof MessageLite.Builder) {
                	// 经过build的Protobuf消息
                    result = wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray());
                }

                // 将Protbuf消息包装成Binary Frame 消息
                WebSocketFrame frame = new BinaryWebSocketFrame(result);
                out.add(frame);
            }
        });
		// Protobuf消息解码器
        pipeline.addLast(new ProtobufDecoder(StockMessage.getDefaultInstance()));

        // 自定义数据处理器
        pipeline.addLast(stockProxyServerHandler);

	}

}


WebSocket 支持主要是增加HttpServerCodec、WebSocketServerProtocolHandler配置,
注意WebSocket发送的Protobuf消息要再做一层编码封装, 以Binary Frame消息发送至客户端, 这样前端才能支持, 比如VUE, 通过二进制字符串码进行解析。

4. Spring Boot 2.X + Netty + WebSocket + Protobuf通讯测试

4.1 客户端连接改造

NettyClientTest代码:

@Log4j2
public class NettyClientTest {
    private static String serverIp = "127.0.0.1";
    private static int serverPort = 19999;

    public static void main(String[] args) throws Exception {
    	// 创建事件循环组
        EventLoopGroup group = new NioEventLoopGroup();
        // 创建启动服务
        Bootstrap bootstrap = new Bootstrap();
        // 注册管理
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline()
                        		// 增加Http Client解码支持
                        		pipeline.addLast(new HttpClientCodec());
                        		// 增加支持参数对象解析, 比如POST参数, 设置聚合内容的最大长度
                        		pipeline.addLast(new HttpObjectAggregator(65536));
                        		//解码器,通过Google Protocol Buffers序列化框架动态的切割接收到的ByteBuf
                                .addLast(new ProtobufVarint32FrameDecoder())                                
                                //Google Protocol Buffers 长度属性编码器
                                .addLast(new ProtobufVarint32LengthFieldPrepender())
                                // 自定义数据接收处理器
                                .addLast("client_handler", new ClientBoundHandler(method))
                                // Protobuf消息解码器
                                .addLast(new ProtobufDecoder(ResponseMessage.getDefaultInstance()))
								// 协议包解码
		                        .addLast(new MessageToMessageDecoder<WebSocketFrame>() {
		                            @Override
		                            protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, 
		                            List<Object> objs) throws Exception {
		                                log.info("MessageToMessageDecoder msg ------------------------");
		                                if (frame instanceof TextWebSocketFrame) {
		                                    TextWebSocketFrame textFrame = (TextWebSocketFrame)frame;
		                                    log.info("TextWebSocketFrame");
		                                } else if (frame instanceof BinaryWebSocketFrame) {
		                                    ByteBuf buf = ((BinaryWebSocketFrame) frame).content();
		                                    objs.add(buf);
		                                    buf.retain();
		                                    log.info("BinaryWebSocketFrame received------------------------");
		                                } else if (frame instanceof PongWebSocketFrame) {
		                                    log.info("WebSocket Client received pong");
		                                } else if (frame instanceof CloseWebSocketFrame) {
		                                    log.info("receive close frame");
		                                }
		
		                            }
		                        })		
		                        // 协议包编码
		                        .addLast(new MessageToMessageEncoder<MessageLiteOrBuilder>() {
		                            @Override
		                            protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception {
		                                ByteBuf result = null;
		                                if (msg instanceof MessageLite) {
		                                    // 没有build的Protobuf消息
		                                    result = wrappedBuffer(((MessageLite) msg).toByteArray());
		                                }
		                                if (msg instanceof MessageLite.Builder) {
		                                    // 经过build的Protobuf消息
		                                    result = wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray());
		                                }
		
		                                // 将Protbuf消息包装成Binary Frame 消息		
		                                WebSocketFrame frame = new BinaryWebSocketFrame(result);
		                                out.add(frame);
		                            }
		                        });

                    }
                });
		
		// 建立连接
        ChannelFuture future;
        try {
            URI websocketURI = new URI(String.format("ws://%s:%d/stock", serverIp, serverPort));
            HttpHeaders httpHeaders = new DefaultHttpHeaders();
            //进行握手
            WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, (String)null, true,httpHeaders);
            log.debug("-ready to connect :"+ handshaker);
            // 获取连接通道
            Channel channel = bootstrap.connect(websocketURI.getHost(), websocketURI.getPort()).sync().channel();
            // 获取自定义接收数据处理器
            ClientBoundHandler handler = (ClientBoundHandler)channel.pipeline().get("client_handler");
            handler.setHandshaker(handshaker);
            // 通过它构造握手响应消息返回给客户端,
            handshaker.handshake(channel);
            //阻塞等待是否握手成功
            future = handler.handshakeFuture().sync();
            log.info("----channel:"+future.channel());


        } catch (Exception e) {
            log.error(e.getMessage(),e);
        }
    }
}
  • 可以看到, 在连接配置上增加了HttpClientCodec, 以及MessageToMessage的编码和解码器, 这里面要对Binary Frame的数据做处理。
  • 在连接处理上, 增加了WebSocketClientHandshaker, 用于WebSocket的初始化连接建立,普通socket连接是不需要这个不走, 这是WebSocket协议规范; 通过URI指定具体路径, 路径写错, 会导致连接不上。

4.2 连接验证

在这里插入图片描述
客户端与服务端成功接口websocket连接, Protobuf格式的登陆数据包, 能够收发成功。

4.3 附带VUE调用示例

  • js protobuf 插件: https://www.npmjs.com/package/protobufjs

  • npm安装protobuf插件

    npm install protobufjs
    

    安装成功提示:

    + protobufjs@6.8.8
    added 14 packages from 39 contributors and audited 764 packages in 10.991s
    found 0 vulnerabilities
    
  • 代码实现

    // 引入Protobuf 组件
    import protobuf from 'protobufjs';
    // 引入编译后的Protobuf数据结构信息
    import StockReqMessage from '@/proto/StockReqMessage.json';
    ...
    // 定义登陆数据接口信息
    let loginMessage= 
    {
    	stockReqHead: {
    		ProtoVer: 'VER_1',
    		RequestType: 'USER_LOGIN',
    		seqId: 1000001
    	},
    	loginReqData:{
    		userNo: 'admin',
    		userPwd: 'test',
    	}
    }
    // 获取Protobuf父级数据结构
    let stockReqMessageRoot = protobuf.Root.fromJSON(StockReqMessage);
    // 获取请求数据包结构
    let stockReqMessage = AwesomeRoot.lookupType('StockReqMessage');
    // 创建登陆数据接口信息
    let stockReqMessageData = stockReqMessage.create(loginMessage);
    // 将登陆数据接口信息进行编码
    let protobufData= stockReqMessage.encode(stockReqMessageData ).finish();
    // 通过websocket发送数据
    this.websocket.send(protobufData);
    

    通过Protobuf插件能够快速帮助实现,没有java客户端的繁琐配置, 这里就介绍vue前端的主要实现步骤,仅供参考。

合一

  • Netty 在 Websocket配置上并不复杂, 只是要遵循websocket通讯协议, 本教程采用的是JAVA客户端,相比普通的Socket连接, 会增加一些配置, 但从使用和业务实现上, 还是按照原逻辑处理, 不会影响。
  • 前端通过JS实现的Protobuf插件, 能够简便快捷的实现基于Websocket+Protobuf通讯。
  • Websocket + Protobuf 通讯方式,无论从连接开销还是数据传输和处理开销, 都要优于Rest Http + Json 方式,广泛适用于数据量大, 性能要求高的业务场景。

本文由mirson创作分享,如需进一步交流,请加QQ群:19310171或访问www.softart.cn

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐