Netty主要是一个对NIO的封装框架

项目相关源码: https://github.com/CodePpoi/netty-code

无论是服务端和客户端通信还是聊天室的实现,其实都要有四个类,Client,ClientHandler,Server,ServerHandler,我们首先是先实现一个简单的服务端与客户端通信,基于此,再开发一个聊天室:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class) //使用其来作为channel的实现
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) {
                            channel.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            System.out.println("netty client start");
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

group其实就是一个线程,bootstrap是用来启动client,以及设置一些启动的参数

channelFuture主要是用来连接到服务器,我们把server放在本地的9000端口,sync是用来等待连接完成

再看NettyClientHandler:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8);
        ctx.writeAndFlush(buf);
    }


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服务端发送消息是: " + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务端地址: " + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }


}

channelActive是一个回调函数,当客户成功连接到服务器时,我们会把"HelloServer"写到缓冲区并flush,这样服务器就会接收到一条信息"HelloServer"

channelRead也是一个回调函数,当服务端发送消息给客户端时,会触发该函数,读取服务端发送到的消息,并打印到控制台

exceptionCaught用来处理异常逻辑

再看NettyServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(8);
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>(){

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            System.out.println("netty server start...");
            ChannelFuture cf = bootstrap.bind(9000).sync();
            cf.channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

首先讲讲为什么要一个bossGroup和workerGroup,然后两个group的线程数是1和8,这其实是一个Reactor模式,单线程的Reactor其实是相当于一个简单的NIO,Reactor就相当于NIO的selector

但是这种模式,有一个问题,就是当客户端连接数很多,比如10万个,并且都是活跃的连接(比如网游)此时当有新的client想要与服务端建立连接时,因为Basic Reactor模式只有一个Reactor,而且我们处理IO操作时是对每个活跃的连接进行轮询,也就是对NIO里面的selectedKeys依次处理,这样新的client必须等我们处理完所有活跃的连接,才能连上服务端,对client的体验非常差,为了避免这种情况,一位大佬推出了多线程Reactor模型

这里面Reactor就对应主Selector(或者说代码中的bossGroup),而那个Threadpool就对应子Selectors(也就是上面代码中的workerGroup),主selector用来处理用户的新连接,子selectors用来处理读取,解码,编码,计算等操作,因为处理新连接是很快的,所以其实只要一个主线程作为selector就行。

接下来再看NettyServerHandler:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端连接通道建立完成");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服务器读取线程" + Thread.currentThread().getName());
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是: " + buf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloClient", CharsetUtil.UTF_8);
        ctx.writeAndFlush(buf);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

这里面就讲一讲channelReadComplete,这个也是一个回调函数,当Server端读取客服端发来的信息完毕以后,Server会发送一个HelloClient给Client端。

运行NettyClient和NettyServer以后,可看到如下输出

至此服务端和客户端通信完毕

第二阶段见https://blog.csdn.net/newbaby2012/article/details/118533764

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐