Java IO:SocketChannel和Selector在ZooKeeper中应用
转载请注明出处:jiq•钦's technical Blog 如果不了解SocketChannel和Selector,请先阅读我的另一篇博文:http://blog.csdn.net/jiyiqinlovexx/article/details/46780555 ZooKeeper的启动从QuorumPeerMain类的main函数开始: 调用顺序是: Main -> initializeAndRu
转载请注明出处:jiq•钦's technical Blog
如果不了解SocketChannel和Selector,请先阅读我的另一篇博文:点击打开链接
ZooKeeper的启动从QuorumPeerMain类的main函数开始:
调用顺序是: Main -> initializeAndRun-> runFromConfig
一、默认的NIOServerCnxnFactory通信方式
其中runFromConfig主要做了两件事情:
(1)初始化客户端与服务端的网络通信处理类ServerCnxnFactory:
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
而createFactory函数的内部实现是:
if (serverCnxnFactoryName == null) {
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
try {
return(ServerCnxnFactory) Class.forName(serverCnxnFactoryName).newInstance();
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ serverCnxnFactoryName);
ioe.initCause(e);
throw ioe;
}
可以看到默认初始化的
ServerCnxnFactory是
NIOServerCnxnFactory,这是Java NIO方式的网络通信,另外还有
NettyServerCnxnFactory
类提供Netty通信方式。
(2)启动QuorumPeer:
quorumPeer.start();
内部会调用初始化好的ServerCnxnFactory类的start函数:
cnxnFactory.start();
二、NIOServerCnxnFactory.start()方法
@Override
publicvoidstart() {
stopped = false;
if (workerPool == null){
workerPool = new WorkerService(
"NIOWorker", numWorkerThreads, false);
}
//先启动一堆selector线程
for(SelectorThread thread : selectorThreads) {
if (thread.getState() == Thread.State.NEW) {
thread.start();
}
}
//再启动accept线程
if (acceptThread.getState() == Thread.State.NEW) {
acceptThread.start();
}
//最后启动expire线程
if (expirerThread.getState() ==Thread.State.NEW){
expirerThread.start();
}
}
三、NIOServerCnxnFactory中几个线程类的关系
NIOServerCnxnFactory中包含了三个类:
(1)AbstractSelectThread:SelectorThread类和AcceptThread类的共同父类,维护了一个selector选择器对象。
(2)AcceptThread:管理新的ZooKeeper client连接请求,实际上就是利用父类的选择器selector监听ServerSocketChannel的“SelectionKey.OP_ACCEPT”事件,一旦来新的请求,负责建立好和客户端的连接SocketChannel,并从SelectorThread线程池分配一个线程,将该SocketChannel连接放入SelectorThread线程维护的acceptedQueue队列中。
(3)SelectorThread:监听AcceptThread分配的已经建立的SocketChannel连接上发生的数据读写事件,并执行实际数据读写。实际上就是利用父类的选择器selector监听在acceptedQueue队列中建立好的连接的数据读写事件,一旦读写事件发生,调用handleIO函数处理读写请求。
下面是读写事件监听线程的线程池selectorThreads的初始化:
for(inti=0; i<numSelectorThreads;++i) {
selectorThreads.add(newSelectorThread(i));
}
下面是新连接管理线程 acceptThread 的初始化:
this.ss =ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
acceptThread= newAcceptThread(ss, addr, selectorThreads);
然后NIOServerCnxnFactory.start()将会启动这些线程。
AcceptThread的run函数很简单,就是监听连接SelectionKey.op_accept事件,然后建立SocketChannel连接,并分配一个SelectorThread线程来处理该连接,具体就不详述了。
SelectorThread的run函数核心就是监听到SelectionKey.OP_READ事件后执行handleIO函数,具体如何和客户端进行数据交互计划放在其他文章中介绍。
更多推荐
所有评论(0)