netty中epoll server和nio server的使用


这几天有空研究了下netty中的EpollEventLoopGroupNioEventLoopGroup的用法,在编码上没有显著的不同,对应的epoll,有一套的api供于使用,但是因为只能在linux机上使用,因此又借助了docker运行linux容器来运行相应程序,这节就来具体的讲述下。

nio server


编写了一个简单的Hello world的http server,不讲述详细代码了,只讲下最后的server中的部分源码,我采用的netty的版本是netty 4.0的,在这就不再使用netty5了,netty5因为一些更为复杂的特性和没有显著的提高性能已经被放弃了,这里就不再提了。

HttpHelloWorldServerHandler:

package cn.com.epoll;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.*;
import io.netty.util.AsciiString;

/**
 * Created by xiaxuan on 17/11/14.
 */
public class HttpHelloWorldServerHandler extends ChannelInboundHandlerAdapter {
    private static final byte[] CONTENT = { 'H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd'};

    private static final AsciiString CONTENT_TYPE = new AsciiString("Content-Type");
    private static final AsciiString CONTENT_LENGTH = new AsciiString("Content-Length");
    private static final AsciiString CONNECTION = new AsciiString("Connection");
    private static final AsciiString KEEP_ALIVE = new AsciiString("keep-alive");

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof HttpRequest) {
            HttpRequest req = (HttpRequest) msg;

            if (HttpUtil.is100ContinueExpected(req)) {
                ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE));
            }
            boolean keepAlive = HttpUtil.isKeepAlive(req);
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(CONTENT));
            response.headers().set(CONTENT_TYPE, "text/plain");
            response.headers().set(CONTENT_LENGTH, response.content().readableBytes());

            if (!keepAlive) {
                ctx.write(response).addListener(ChannelFutureListener.CLOSE);
            } else {
                response.headers().set(CONNECTION, KEEP_ALIVE);
                ctx.write(response);
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

channelRead的方法很简单,就是判断当前是不是http请求,是的话就输出Hello World,功能比较简单。

HttpHelloWorldServerInitializer:

public class HttpHelloWorldServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new HttpServerCodec());
        ch.pipeline().addLast(new HttpHelloWorldServerHandler());
    }
}

pipline添加Handler处理。

NioHttpHelloWorldServer:

package cn.com.epoll;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Created by xiaxuan on 17/11/14.
 */
public class NioHttpHelloWorldServer {

    private static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));

    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        try {
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childHandler(new HttpHelloWorldServerInitializer());

            Channel ch = b.bind(PORT).channel();
            ch.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

普通的netty演示程序的写法,没什么特殊的,但是还是需要提一下其中使用的NioEventLoopGroup,NioEventLoopGroup就是一个简单的线程池调度服务,我们再追溯NioEventLoopGroup的源码的时候可以发现最终NioEventLoopGroup继承的就是ScheduledExecutorService,就是有多个NioEventLoop对象的线程池,如果不指定线程池的的容量的话,默认就是当前cpu * 2的数量,转到源码可以看到传入的构造函数为0,为以下:

 * Create a new instance using the default number of threads, the default {@link ThreadFactory} and
     * the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
     */
    public NioEventLoopGroup() {
        this(0);
    }

但是转到最终源码会发现当判断为0的时候,去了默认值,源码如下:

 static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

如上,最终取得默认值就是cpu * 2。

而这里最终还是需要提一下NioEventLoop,NioEventLoop的父类继承了SingleThreadEventExecutor,也是一个线程池调度服务,但是只有一个单线程,在NioEventLoop创建的时候,同时也会创建一个Selector,selector管理channel,所以实际上NioEventLoopGroup就是一组管理Channel的线程池。

源码解析就到此未知,运行程序,在web端请求的效果如下:

十分简单,这是nio server的运行。

epoll server


epoll server的源码主要在server上的不同,其他的与上相同,server如下:

package cn.com.epoll;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;

/**
 * Created by xiaxuan on 17/11/14.
 */
public class HttpHelloWorldServer {

    static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));

    public static void main(String[] args) {
        EventLoopGroup bossGroup = new EpollEventLoopGroup(1);
        EventLoopGroup workerGroup = new EpollEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.channel(EpollServerSocketChannel.class);
            b.option(ChannelOption.SO_BACKLOG, 1024);
            b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            b.group(bossGroup, workerGroup)
                    .childHandler(new HttpHelloWorldServerInitializer());
            Channel ch = b.bind(PORT).sync().channel();
            ch.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

暂时先不讲Epoll和nio selector特性的不同,首先先把这里的应用程序讲完,在普通的windows或者mac上运行当前的程序会运行不起来的,会报错为:

Exception in thread "main" java.lang.UnsatisfiedLinkError: failed to load the required native library
    at io.netty.channel.epoll.Epoll.ensureAvailability(Epoll.java:78)
    at io.netty.channel.epoll.EpollEventLoopGroup.<clinit>(EpollEventLoopGroup.java:38)
    at cn.com.epoll.HttpHelloWorldServer.main(HttpHelloWorldServer.java:19)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.ExceptionInInitializerError
    at io.netty.channel.epoll.Epoll.<clinit>(Epoll.java:33)
    ... 7 more
Caused by: java.lang.IllegalStateException: Only supported on Linux
    at io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:189)
    at io.netty.channel.epoll.Native.<clinit>(Native.java:61)
    ... 8 more

epoll模型只有在linux kernel 2.6以上才能支持,在windows和mac都是不支持的,因此需要在linux上运行这个程序,但是本机是mac系统,因此不能在本地运行,然后本地也没有安装linux虚拟机,因此便借助了docker来使程序运行,于此同时为了方便运行maven打出的jar包,借助了一个maven插件以供打出一个可执行的jar包,插件如下:

<plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>${exec.mainClass}</mainClass>
                                </transformer>
                            </transformers>
                            <artifactSet>
                            </artifactSet>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

然后编写的Dockerfile文件如下:

FROM java:8
MAINTAINER bingwenwuhen bingwenwuhen@163.com

RUN mkdir /app
COPY target/epoll-server-1.0-SNAPSHOT.jar /app

ENTRYPOINT ["java", "-jar", "app/epoll-server-1.0-SNAPSHOT.jar"]
EXPOSE 8080

我拉取的这个java:8基础镜像就是以centos为基础镜像,因此就是linux环境,在可能一些基础镜像上并非支持epoll模型的可能性上,可以直接拉取centos镜像,然后配置java环境等等,在此不再详述,这个在网上有着足够的资料。

在maven编译打包之后,docker进行镜像构建,运行,最后执行docker ps命令,查看容器运行情况,如下:

容器正常运行,使用curl命令请求服务,将会获取如上一样的结果,epoll server正常运行。

我们在查看EpollEventLoopGroup源码的时候可以发现,NioEventLoopGroup和EpollEventgroup最终继承的类都是相同,只是部分特性不同而已,因此在这就不再讲述EpollEventLoopGroup的源码,然而epoll模型本身讲述起来又相当复杂,不是本节能够讲述清楚的,对于EpollEventLoopGroup与epoll模型,以后有空再做专题详述。

源码下载地址


NioEventLoopGroup的源码下载地址就不再给出,下面是epoll server的源码下载地址:

源码下载地址

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐