Gateway 管理 Netty 长连接(下) - Java 客户端与 VUE 连接Gateway - Netty
Spring Cloud Gateway 管理 Netty : https://editor.csdn.net/md/?articleId=105424618上篇文章中简单概述了一下网关路由长连接的思路这篇文章来贴一下代码Java 通过 SocketClient 连接 NettyVUE 通过 WebSocket 连接 Netty提要:本文章中有关 Netty 和 Clien...
·
Spring Cloud Gateway 管理 Netty : https://editor.csdn.net/md/?articleId=105424618
上篇文章中简单概述了一下网关路由长连接的思路
这篇文章来贴一下代码
- Java 通过 SocketClient 连接 Netty
- VUE 通过 WebSocket 连接 Netty
提要:
本文章中有关 Netty 和 Client 以及 Socket 的代码仅作为网关路由长连接案例,非网关路由请查看直连文档,以免出现无法预想的问题。
首先是 Netty - WebSocket 服务端的搭建
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import java.net.InetSocketAddress;
/**
* Netty 服务端
* @author cuixiaojian
*/
@Slf4j
@Configuration
public class NettyServer {
/**
* Netty 端口号
*/
@Value("${netty.port}")
private Integer nettyPort;
/**
* 启动方法
* @throws Exception
*/
public void startServer() throws Exception {
// Netty 线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup wokerGroup = new NioEventLoopGroup();
try{
// Netty 启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,wokerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_KEEPALIVE,true)
.option(ChannelOption.SO_BACKLOG,1024 * 1024 * 10)
// 设置处理器
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("http-codec",new HttpServerCodec());
pipeline.addLast("http-chunked",new ChunkedWriteHandler());
pipeline.addLast("aggregator",new HttpObjectAggregator(1024*1024*1024));
pipeline.addLast(new WebSocketServerProtocolHandler("/notice",null,true,65535));
// 自定义处理器
pipeline.addLast(new WebSocketHandle());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(nettyPort)).sync();
if (channelFuture.isSuccess()) {
log.info("Netty服务端启动成功, 端口号:{}",nettyPort);
}
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error("Netty服务端启动失败,异常信息为: {}",e);
} finally {
bossGroup.shutdownGracefully();
wokerGroup.shutdownGracefully();
}
}
}
自定义处理器
import com.xnyzc.lhy.common.util.CheckUtil;
import com.xnyzc.lhy.notice.entity.message.SysSocketUser;
import com.xnyzc.lhy.notice.netty.param.NettyMessage;
import com.xnyzc.lhy.notice.netty.util.NettyMsg;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 自定义的handler类
* @author cuixiaojian
*/
@Slf4j
@Configuration
public class WebSocketHandle extends SimpleChannelInboundHandler<Object> {
//客户端组
public static ChannelGroup channelGroup;
static {
channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
/**
* 存储 Channel 的容器
*/
private static ConcurrentMap<String, Channel> channelMap = new ConcurrentHashMap<>();
/**
* Handler活跃状态,连接成功
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 将通道放入组
channelGroup.add(ctx.channel());
}
/**
* 通道读取
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// 文本消息 (约定 与客户端 Socket 消息类型为文本消息)
if (msg instanceof TextWebSocketFrame) {
// 获取当前channel绑定的IP地址
InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String address = ipSocket.getAddress().getHostAddress();
log.info("获取到远程连接:IP:{}", address);
TextWebSocketFrame textFrame = (TextWebSocketFrame) msg;
String message = textFrame.text();
log.info("收到来自客户端消息: {}", message);
// 此处 NettyMsg 类为自定义消息解析器,通过 message 串来解析消息。
NettyMessage nettyMessage = NettyMsg.getNettyMessage(message);
// 解析成为自定义消息对象
SysSocketUser sysSocketUser = NettyMsg.getCon(nettyMessage.getCon(), SysSocketUser.class);
// 与客户端约定建立连接成功时,发送一条包含用户ID的消息,此处从消息中获取用户ID与Channel绑定,后续为用户发送消息时使用。
if (CheckUtil.objIsNotEmpty(sysSocketUser)) {
if (CheckUtil.objIsNotEmpty(sysSocketUser.getUserId())) {
//将 用户 和 Channel 的关系保存
if (!channelMap.containsKey(sysSocketUser.getUserId())) {
channelMap.put(sysSocketUser.getUserId(), ctx.channel());
}
}
}
}
// PING 类型消息
if (msg instanceof PongWebSocketFrame) {
log.info("PING SUCCESS");
}
// 请求关闭连接类型消息
if (msg instanceof CloseWebSocketFrame) {
log.info("客户端关闭连接,服务端关闭通道");
Channel channel = ctx.channel();
channel.close();
}
}
/**
* 未注册状态
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
log.info("等待连接");
}
/**
* 非活跃状态,没有连接远程主机的时候。
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端关闭");
channelGroup.remove(ctx.channel());
}
/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("连接异常:" + cause.getMessage());
ctx.close();
}
/**
* 给指定用户发内容
* 可以使用此方法推送消息给客户端
*/
public void sendMessage(String userId, String message) {
Channel channel = channelMap.get(userId);
channel.writeAndFlush(new TextWebSocketFrame(message));
}
/**
* 群发消息
*/
public void sendMessageAll(String message) {
channelGroup.writeAndFlush(new TextWebSocketFrame(message));
}
}
Spring Boot 启动后一并启动 Netty 服务
public class NoticeApplication {
public static void main(String[] args) {
ApplicationContext context = SpringApplication.run(NoticeApplication.class, args);
NettyServer nettyServer = context.getBean(NettyServer.class);
try {
nettyServer.startServer();
} catch (Exception e) {
System.out.println("netty 启动失败");
}
}
}
其次是客户端与Gateway建立连接
1. Java 客户端连接
1. 客户端服务
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.URI;
/**
* Netty - WebSocket 客户端
* @author cuixiaojian
*/
@Slf4j
public class NettyClient {
public static void main(String[] args) throws Exception {
// Netty 线程组
EventLoopGroup group = new NioEventLoopGroup();
// Netty 启动类
Bootstrap boot = new Bootstrap();
boot.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.group(group)
.handler(new LoggingHandler(LogLevel.INFO))
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("http-codec",new HttpClientCodec());
pipeline.addLast("aggregator",new HttpObjectAggregator(1024*1024*10));
pipeline.addLast("hookedHandler", new WebSocketClientHandler());
}
});
// 这里的路径即为网关中配置的路由路径
URI webSocketURI = new URI("ws://localhost:30000/nio/v1");
HttpHeaders httpHeaders = new DefaultHttpHeaders();
// 握手
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(webSocketURI, WebSocketVersion.V13, (String) null, true, httpHeaders);
// 连接通道
final Channel channel = boot.connect(webSocketURI.getHost(), webSocketURI.getPort()).sync().channel();
WebSocketClientHandler handler = (WebSocketClientHandler) channel.pipeline().get("hookedHandler");
handler.setHandshaker(handshaker);
handshaker.handshake(channel);
// 阻塞等待是否握手成功
handler.handshakeFuture().sync();
// 给服务端发送的内容,如果客户端与服务端连接成功后,可以多次掉用这个方法发送消息
sendMessage(channel);
}
private static void sendMessage(Channel channel){
//发送的内容,是一个文本格式的内容
// 此处NettyMsg为自定义消息解析器,setJsonMsg为设置一条Json串消息
String putMessage = NettyMsg.setJsonMsg("NOTICE", "{\"userId\": \"123456\"}");
TextWebSocketFrame frame = new TextWebSocketFrame(putMessage);
channel.writeAndFlush(frame).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("消息发送成功,发送的消息是:" + putMessage);
} else {
System.out.println("消息发送失败 " + channelFuture.cause().getMessage());
}
}
});
}
}
2. 客户端处理器
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
/**
* @author cuixiaojian
*/
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
/**
* 握手的状态信息
*/
private WebSocketClientHandshaker handshaker;
/**
* netty自带的异步处理
*/
private ChannelPromise handshakeFuture;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("当前握手的状态"+this.handshaker.isHandshakeComplete());
Channel ch = ctx.channel();
FullHttpResponse response;
// 进行握手操作
if (!this.handshaker.isHandshakeComplete()) {
try {
response = (FullHttpResponse)msg;
// 握手协议返回,设置结束握手
this.handshaker.finishHandshake(ch, response);
// 设置成功
this.handshakeFuture.setSuccess();
System.out.println("服务端的消息"+response.headers());
} catch (WebSocketHandshakeException var7) {
FullHttpResponse res = (FullHttpResponse)msg;
String errorMsg = String.format("握手失败,status:%s,reason:%s", res.status(), res.content().toString(CharsetUtil.UTF_8));
this.handshakeFuture.setFailure(new Exception(errorMsg));
}
} else if (msg instanceof FullHttpResponse) {
response = (FullHttpResponse)msg;
throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
} else {
// 接收服务端的消息
WebSocketFrame frame = (WebSocketFrame)msg;
// 文本信息
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame)frame;
System.out.println("客户端接收的消息是:"+textFrame.text());
}
// 二进制信息
if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame)frame;
System.out.println("BinaryWebSocketFrame");
}
// PING信息
if (frame instanceof PongWebSocketFrame) {
System.out.println("WebSocket Client received pong");
}
// 关闭消息
if (frame instanceof CloseWebSocketFrame) {
System.out.println("receive close frame");
ch.close();
}
}
}
/**
* Handler活跃状态,表示连接成功
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("与服务端连接成功");
}
/**
* 非活跃状态,没有连接远程主机的时候。
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("主机关闭");
}
/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("连接异常:"+cause.getMessage());
ctx.close();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
this.handshakeFuture = ctx.newPromise();
}
public WebSocketClientHandshaker getHandshaker() {
return handshaker;
}
void setHandshaker(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public ChannelPromise getHandshakeFuture() {
return handshakeFuture;
}
public void setHandshakeFuture(ChannelPromise handshakeFuture) {
this.handshakeFuture = handshakeFuture;
}
ChannelFuture handshakeFuture() {
return this.handshakeFuture;
}
2. VUE 客户端连接
// 初始化weosocket
initWebSocket() {
// 设置网关路由地址
const wsuri = 'ws://127.0.0.1:30000/nio/v1'
this.websock = new WebSocket(wsuri)
this.websock.onmessage = this.websocketonmessage
this.websock.onopen = this.websocketonopen
this.websock.onerror = this.websocketonerror
this.websock.onclose = this.websocketclose
},
// 连接建立之后执行send方法发送数据
websocketonopen() {
const actions = { 'code': '0', 'method': 'NOTICE', 'msg': '', 'con': '{\'userId\':\'1\'}' }
this.websocketsend(JSON.stringify(actions))
},
// 连接建立失败重连
websocketonerror() {
// this.initWebSocket()
},
// 数据接收
websocketonmessage(e) {
// 解析为Json消息
// const redata = JSON.parse(e.data)
// element-ui 通知组件
this.$notify({
title: '通知',
message: '您有一条新消息'
})
// 自定义接收到消息后的处理
this.getMessage()
},
// 数据发送
websocketsend(Data) {
this.websock.send(Data)
},
// 关闭
websocketclose(e) {
console.log('断开连接', e)
}
在钩子中初始化 WebSocket
mounted() {
this.initWebSocket()
},
更多推荐
已为社区贡献1条内容
所有评论(0)