什么是netty?

官方解释:

Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.Netty is an asynchronous event-driven network application framework 

解释一下就是:Netty是一个NIO网络编程框架,快速开发高性能、高可靠性的网络服务器/客户端程序。 极大地简化了TCP和UDP等网络编程。是一个异步事件驱动的网络框架。

重点是NIO、快速、高性能。

哪些中间件用到了netty?

RPC(pigeon、dubbo、HSF)Hadoop、SparkMQ(swallow、RocketMQ)Zookeeper等

最核心的三大组件

缓冲(Buffer)通道(Channel)事件模型(Event Model)

netty的组件

Bootstrap:netty的辅助启动器,netty客户端和服务器的入口,Bootstrap是创建客户端连接的启动器,ServerBootstrap是监听服务端端口的启动器,跟tomcat的Bootstrap类似,程序的入口。

Channel:关联jdk原生socket的组件,常用的是NioServerSocketChannel和NioSocketChannel,NioServerSocketChannel负责监听一个tcp端口,有连接进来通过boss reactor创建一个NioSocketChannel将其绑定到worker reactor,然后worker reactor负责这个NioSocketChannel的读写等io事件。

EventLoop:netty最核心的几大组件之一,就是我们常说的reactor,人为划分为boss reactor和worker reactor。通过EventLoopGroup(Bootstrap启动时会设置EventLoopGroup)生成,最常用的是nio的NioEventLoop,就如同EventLoop的名字,EventLoop内部有一个无限循环,维护了一个selector,处理所有注册到selector上的io操作,在这里实现了一个线程维护多条连接的工作。

ChannelPipeline:netty最核心的几大组件之一,ChannelHandler的容器,netty处理io操作的通道,与ChannelHandler组成责任链。write、read、connect等所有的io操作都会通过这个ChannelPipeline,依次通过ChannelPipeline上面的ChannelHandler处理,这就是netty事件模型的核心。ChannelPipeline内部有两个节点,head和tail,分别对应着ChannelHandler链的头和尾。

ChannelHandler:netty最核心的几大组件之一,netty处理io事件真正的处理单元,开发者可以创建自己的ChannelHandler来处理自己的逻辑,完全控制事件的处理方式。ChannelHandler和ChannelPipeline组成责任链,使得一组ChannelHandler像一条链一样执行下去。ChannelHandler分为inBound和outBound,分别对应io的read和write的执行链。ChannelHandler用ChannelHandlerContext包裹着,有prev和next节点,可以获取前后ChannelHandler,read时从ChannelPipeline的head执行到tail,write时从tail执行到head,所以head既是read事件的起点也是write事件的终点,与io交互最紧密。

Unsafe:顾名思义这个类就是不安全的意思,但并不是说这个类本身不安全,而是不要在应用程序里面直接使用Unsafe以及他的衍生类对象,实际上Unsafe操作都是在reactor线程中被执行。Unsafe是Channel的内部类,并且是protected修饰的,所以在类的设计上已经保证了不被用户代码调用。Unsafe的操作都是和jdk底层相关。EventLoop轮询到read或accept事件时,会调用unsafe.read(),unsafe再调用ChannelPipeline去处理事件;当发生write事件时,所有写事件都会放在EventLoop的task中,然后从ChannelPipeline的tail传播到head,通过Unsafe写到网络中。

示例代码

(找netty包下的)https://gitee.com/zhang-xiao-xiang/all-demo-parent

主要代码示例(实现客户端发送一个请求,服务器会返回一个字符串)

package com.example.java.base.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
 
/**
 * HelloServer:实现客户端发送一个请求,服务器会返回 hello netty
 * @author zhangxiaoxiang
 * @date 2019/6/15
 */
public class HelloServer {
	public static void main(String[] args) throws Exception {
		// 定义一对线程组
		// 主线程组, 用于接受客户端的连接,但是不做任何处理,跟老板一样,不做事
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		// 从线程组, 老板线程组会把任务丢给他,让手下线程组去做任务
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			// netty服务器的创建, ServerBootstrap 是一个启动类
			ServerBootstrap serverBootstrap = new ServerBootstrap();
			// 设置主从线程组
			serverBootstrap.group(bossGroup, workerGroup)
					// 设置nio的双向通道
							.channel(NioServerSocketChannel.class)
					// 子处理器,用于处理workerGroup
							.childHandler(new HelloServerInitializer());
			
			// 启动server,并且设置8088为启动的端口号,同时启动方式为同步
			ChannelFuture channelFuture = serverBootstrap.bind(8088).sync();
			System.out.println("启动后浏览器访问 http://localhost:8088/");
			// 监听关闭的channel,设置位同步方式
			channelFuture.channel().closeFuture().sync();
		} finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
}

 

package com.example.java.base.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;

/**
 * CustomHandler:创建自定义助手类 
 * SimpleChannelInboundHandler: 对于请求来讲,其实相当于[入站,入境]
 *
 * @author zhangxiaoxiang
 * @date 2019/6/15
 */
public class CustomHandler extends SimpleChannelInboundHandler<HttpObject> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg)
            throws Exception {
        // 获取channel
        Channel channel = ctx.channel();
        if (msg instanceof HttpRequest) {
            // 显示客户端的远程地址
            System.out.println(channel.remoteAddress());
            // 定义发送的数据消息(注意中文乱码的问题,idea编辑器,或者注意右下角那个编码多切换几次)
            ByteBuf content = Unpooled.copiedBuffer("Hello netty", CharsetUtil.UTF_8);
            // 构建一个http response
            FullHttpResponse response =
                    new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
                            HttpResponseStatus.OK,
                            content);
            // 为响应增加数据类型和长度
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
            // 把响应刷到客户端
            ctx.writeAndFlush(response);
        }
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        //idea 输入 soutm 可以快捷打印方法
        System.out.println("CustomHandler.channelRegistered 注册");
        super.channelRegistered(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("CustomHandler.channelUnregistered 移除");
        super.channelUnregistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("CustomHandler.channelActive 活跃");
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("CustomHandler.channelInactive 不活跃");
        super.channelInactive(ctx);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("CustomHandler.channelReadComplete 读取完毕");
        super.channelReadComplete(ctx);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("CustomHandler.userEventTriggered 用户事件触发");
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        System.out.println("CustomHandler.channelWritabilityChanged 可写更改");
        super.channelWritabilityChanged(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("CustomHandler.exceptionCaught 捕获到异常");
        super.exceptionCaught(ctx, cause);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("CustomHandler.handlerAdded 助手类添加");
        super.handlerAdded(ctx);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("CustomHandler.handlerRemoved 助手类移除");
        super.handlerRemoved(ctx);
    }

}
package com.example.java.base.netty;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;

/**
 * HelloServerInitializer:初始化器,channel注册后,会执行里面的相应的初始化方法
 *
 * @author zhangxiaoxiang
 * @date 2019/6/15
 */
public class HelloServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        // 通过SocketChannel去获得对应的管道
        ChannelPipeline pipeline = channel.pipeline();
        // 通过管道,添加handler
        // HttpServerCodec是由netty自己提供的助手类,可以理解为拦截器
        // 当请求到服务端,我们需要做解码,响应到客户端做编码
        pipeline.addLast("HttpServerCodec", new HttpServerCodec());
        // 添加自定义的助手类,返回 "hello netty~"
        pipeline.addLast("customHandler", new CustomHandler());
    }
}

 编写一个websocket 测试demo

编写服务器端类 WsServer

package com.example.java.base.netty.websocket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * WsServer:WsServer
 *
 * @author zhangxiaoxiang
 * @date 2020/12/7
 */
public class WsServer {

	public static void main(String[] args) throws Exception {
		// 定义一对线程组
		EventLoopGroup mainGroup = new NioEventLoopGroup();
		EventLoopGroup subGroup = new NioEventLoopGroup();
		try {
			// netty服务器的创建, ServerBootstrap 是一个启动类
			ServerBootstrap server = new ServerBootstrap();
			server.group(mainGroup, subGroup)
				.channel(NioServerSocketChannel.class)
				.childHandler(new WsServerInitializer());
			// 启动server,并且设置8088为启动的端口号,同时启动方式为同步
			ChannelFuture future = server.bind(8088).sync();
			System.out.println("启动成功,然后运行index.html,并打开F12调试查看整个流程,并观察控制台,关闭浏览器再查看控制台");
			// 监听关闭的channel,设置位同步方式
			future.channel().closeFuture().sync();
		} finally {
			mainGroup.shutdownGracefully();
			subGroup.shutdownGracefully();
		}
	}
	
}

 编写初始化器类 WsServerInitializer

package com.example.java.base.netty.websocket;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

/**
 * WsServerInitializer:初始化器
 *
 * @author zhangxiaoxiang
 * @date 2020/12/7
 */
public class WsServerInitializer extends ChannelInitializer<SocketChannel> {
	/**
	 * 注册后将调用此方法。 方法返回后,此实例将从Channel的ChannelPipeline中删除
	 * @param ch ch
	 * @throws Exception
	 */
	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();
		// websocket 基于http协议,所以要有http编解码器
		pipeline.addLast(new HttpServerCodec());
		// 对写大数据流的支持 
		pipeline.addLast(new ChunkedWriteHandler());
		// 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse
		// 几乎在netty中的编程,都会使用到此hanler
		pipeline.addLast(new HttpObjectAggregator(1024*64));
		
		// ====================== 以上是用于支持http协议    ======================
		// ====================== 以下是支持httpWebsocket ======================

		//websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws
		//本handler会帮你处理一些繁重的复杂的事
		//会帮你处理握手动作: handshaking(close, ping, pong) ping + pong = 心跳
		//对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同
		pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
		// 自定义的handler
		pipeline.addLast(new ChatHandler());
	}

}

编写处理消息的 ChatHandler

package com.example.java.base.netty.websocket;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.time.LocalDateTime;


/**
 * ChatHandler:处理消息的handler
 * TextWebSocketFrame: 在netty中,是用于为websocket专门处理文本的对象,frame是消息的载体
 *
 * @author zhangxiaoxiang
 * @date 2020/12/7
 */
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    /**
     * 用于记录和管理所有客户端的channle
     */
    private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        // 获取客户端传输过来的消息
        String content = msg.text();
        System.out.println("接受到的数据:" + content);
        // for (Channel channel : clients) {
        //     channel.writeAndFlush(new TextWebSocketFrame("[服务器在]" + LocalDateTime.now() + "接受到消息, 消息为:" + content));
        // }
        // 下面这个方法,和上面的for循环,一致
        clients.writeAndFlush(new TextWebSocketFrame("[服务器在]" + LocalDateTime.now() + "接受到消息, 消息为:" + content));

    }

    /**
     * 当客户端连接服务端之后(打开连接)
     * 获取客户端的channle,并且放到ChannelGroup中去进行管理
     * @param ctx ctx
     * @throws Exception e
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        clients.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // 当触发handlerRemoved,ChannelGroup会自动移除对应客户端的channel
        // clients.remove(ctx.channel());
        System.out.println("客户端断开,channle对应的长id为:" + ctx.channel().id().asLongText());
        System.out.println("客户端断开,channle对应的短id(长ID的最后一段)为:" + ctx.channel().id().asShortText());
    }


}

编写HTML测试

<!DOCTYPE html>
<html>
	<head>
		<meta charset="utf-8" />
		<title>netty 测试消息发送和接收</title>
	</head>
	<body>
		
		<div>发送消息:</div>
		<input type="text" id="msgContent"/>
		<input type="button" value="点我发送" onclick="CHAT.chat()"/>
		
		<div>接受消息:</div>
		<div id="receiveMsg" style="background-color: gainsboro;"></div>
		
		<script type="application/javascript">
			
			window.CHAT = {
				socket: null,
				init: function() {
					if (window.WebSocket) {
						CHAT.socket = new WebSocket("ws://127.0.0.1:8088/ws");
						CHAT.socket.onopen = function() {
							console.log("连接建立成功...");
						},
						CHAT.socket.onclose = function() {
							console.log("连接关闭...");
						},
						CHAT.socket.onerror = function() {
							console.log("发生错误...");
						},
						CHAT.socket.onmessage = function(e) {
							console.log("接受到消息:" + e.data);
							var receiveMsg = document.getElementById("receiveMsg");
							var html = receiveMsg.innerHTML;
							receiveMsg.innerHTML = html + "<br/>" + e.data;
						}
					} else {
						alert("浏览器不支持websocket协议...");
					}
				},
				chat: function() {
					var msg = document.getElementById("msgContent");
					CHAT.socket.send(msg.value);
				}
			};
			
			CHAT.init();
			
		</script>
	</body>
</html>

整体结构

 以上就是netty基本入门

Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐