手把手带你撸zookeeper源码-zookeeper确定好角色后会做什么?
接上文手把手带你撸zookeeper源码-zookeeper启动(五)leader选举投票归档-确认当前zk服务的角色上篇文章主要分析了leader选举的最终阶段,根据zk集群的相互投票之后,进行投票归档,然后判断某个zk获得投票数是否大于集群数量的一半,如果未超过一半,则继续下一轮的投票选举,如果有某个zk有超过一半的选票,则leader确定,然后其他zk服务则为follower或者observ
接上文 手把手带你撸zookeeper源码-zookeeper启动(五)leader选举投票归档-确认当前zk服务的角色
上篇文章主要分析了leader选举的最终阶段,根据zk集群的相互投票之后,进行投票归档,然后判断某个zk获得投票数是否大于集群数量的一半,如果未超过一半,则继续下一轮的投票选举,如果有某个zk有超过一半的选票,则leader确定,然后其他zk服务则为follower或者observer
不知道大家有没有搭建过三台zk的集群,如果你是按照myid从小到大依次启动三台服务器,一般情况下都是第二台服务器是会成为leader的,相信大家看了之前的代码分析应该能知道为什么了吧?
接下来还会有很多篇文章来剖析zookeeper集群之间的数据如何基于2pc的方式来进行数据同步,客户端和服务端的会话如何创建和维护、zookeeper内部的数据结构是如何保存的?如何创建、删除临时节点、持久节点、顺序节点,zookeeper的监听回调通知如何实现的,以及zookeeper的故障恢复是如何做的?东西还有很多,我们一一去剖析,一步一个脚印的学习
今天这篇文章我们来分析一下,既然leader已经选举出来了,接下来会做什么呢?
上篇文章我们分析完了FastLeaderElection#lookForLeader这个方法,它返回了当前一个Vote对象,确定了leader、follower、observer的角色,此方法也就结束了。
if (n == null) {
//如果当前服务器是的sid和投票为leader的sid一样
//则设置peerState状态为LEADING,否则要么为FOLLOWING或者OBSERVING
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
我们接着看调用这行代码的地方,在QuorumPeer中的run方法中,把上面返回的Vote对象赋值给currentVote,接着会进行下一轮的循环,此时zk的角色都已经确定了,然后会进入到响应的分支当中
我们一个个看,如果peerState == LEADING,则进入如下分支
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);
}
break;
我们今天主要分析一下看看leader角色zookeeper主要做了什么,我们一步步来分析
setLeader(makeLeader(logFactory));
这一行代码主要是创建一个Leader对象,并且把创建出来的leader对象赋值给当前leader变量,下面会调用leader.lead(),这个方法很重要,等会分析,我们看看创建Leader对象都干了啥
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
return new Leader(this, new LeaderZooKeeperServer(logFactory,
this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
}
logFactory这个参数就是解析完zoo.cfg文件之后,把属性封装到了QuorumPeerConfig对象中,这个对象中肯定有我们配置的日志目录,和数据目录,然后根据这两个目录会创建一个FileTxnSnapLog对象传递给QuorumPeer对象,大家可以看一下之前的代码
然后创建了LeaderZookeeperServer对象,在这个方法里面会有一系列的调用链来处理客户端发送过来的请求, 如2PC阶段如何处理的
new ZooKeeperServer.BasicDataTreeBuilder()
static public class BasicDataTreeBuilder implements DataTreeBuilder {
public DataTree build() {
return new DataTree();
}
}
猜想一下,BasicDataTreeBuilder对象提供了一个build()方法,返回了一个DataTree对象,这个DataTree肯定就是zookeeper内纯的数据结构对象,然后会在某个地方会调用这个build方法来创建一个DataTree内纯对象, 然后客户端发送增删改查节点的时候肯定就是操作的这个数据结构,是不是这样的,我们之后再具体分析
最后创建一个Leader对象
Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
this.self = self;
this.proposalStats = new ProposalStats();
if (self.getQuorumListenOnAllIPs()) {
ss = new ServerSocket(self.getQuorumAddress().getPort());
} else {
ss = new ServerSocket();
}
ss.setReuseAddress(true);
if (!self.getQuorumListenOnAllIPs()) {
ss.bind(self.getQuorumAddress());
}
this.zk=zk;
}
在我们的配置文件中有server.x=zk01:2888:3888这样的配置,在这段代码中,就是对当前服务器的2888端口进行监听,等待其他follower和当前leader进行连接
创建完Leader对象之后,最终会调用leader.lead()方法
void lead() throws IOException, InterruptedException {
self.end_fle = Time.currentElapsedTime();
//记住leader选举的时间差
long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);
LOG.info("LEADING - LEADER ELECTION TOOK - {}", electionTimeTaken);
self.start_fle = 0;
self.end_fle = 0;
//JMX监控
zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
try {
self.tick.set(0);
//加载硬盘上的数据到内存中
zk.loadData();
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
// Start thread that waits for connection requests from
// new followers.
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
readyToStart = true;
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
synchronized(this){
lastProposed = zk.getZxid();
}
newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
null, null);
if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
LOG.info("NEWLEADER proposal has Zxid of "
+ Long.toHexString(newLeaderProposal.packet.getZxid()));
}
waitForEpochAck(self.getId(), leaderStateSummary);
self.setCurrentEpoch(epoch);
try {
waitForNewLeaderAck(self.getId(), zk.getZxid());
} catch (InterruptedException e) {
shutdown("Waiting for a quorum of followers, only synced with sids: [ "
+ getSidSetString(newLeaderProposal.ackSet) + " ]");
HashSet<Long> followerSet = new HashSet<Long>();
for (LearnerHandler f : learners)
followerSet.add(f.getSid());
if (self.getQuorumVerifier().containsQuorum(followerSet)) {
LOG.warn("Enough followers present. "
+ "Perhaps the initTicks need to be increased.");
}
Thread.sleep(self.tickTime);
self.tick.incrementAndGet();
return;
}
startZkServer();
String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
if (initialZxid != null) {
long zxid = Long.parseLong(initialZxid);
zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
}
if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
self.cnxnFactory.setZooKeeperServer(zk);
}
boolean tickSkip = true;
while (true) {
Thread.sleep(self.tickTime / 2);
if (!tickSkip) {
self.tick.incrementAndGet();
}
HashSet<Long> syncedSet = new HashSet<Long>();
// lock on the followers when we use it.
syncedSet.add(self.getId());
for (LearnerHandler f : getLearners()) {
if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
syncedSet.add(f.getSid());
}
f.ping();
}
// check leader running status
if (!this.isRunning()) {
shutdown("Unexpected internal error");
return;
}
if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
return;
}
tickSkip = !tickSkip;
}
} finally {
zk.unregisterJMX(this);
}
}
先分析第一个关键点
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
LeanerCnxAcceptor是一个线程,这里开启一个线程,用来监听其他follower来和当前leader进行连接的,我们看一下LeanerCnxAcceptor.run方法
@Override
public void run() {
try {
while (!stop) {
try{
Socket s = ss.accept();
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);
BufferedInputStream is = new BufferedInputStream(
s.getInputStream());
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();
} catch (SocketException e) {
if (stop) {
stop = true;
} else {
throw e;
}
} catch (SaslException e){
LOG.error("Exception while connecting to quorum learner", e);
}
}
} catch (Exception e) {
LOG.warn("Exception while accepting follower", e);
}
}
这里通过ServerSocket.accept()进行阻塞等待其他follower进行连接,如果有连接进来之后则会创建一个LearnerHandler对象,它也是一个线程,然后把socket交给LearnerHandler,由它来处理和连接进来的socket进行通信处理,这里也是用的阻塞bio,每当有一个follower或者observer进来连接时,都会创建一个单独线程去处理连接以及发送的数据
我们可以进入到LearnerHandler.run方法中去看看,现在有某个follower连接进来了,然后其对应的socket交给了LearnerHandler,接下来就是读取socket中的数据
先截取一部分代码看一下
leader.addLearnerHandler(this);
tickOfNextAckDeadline = leader.self.tick.get()
+ leader.self.initLimit + leader.self.syncLimit;
ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);
QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");//读取follower发送过来的注册数据包
if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
LOG.error("First packet " + qp.toString()
+ " is not FOLLOWERINFO or OBSERVERINFO!");
return;
}
这块代码中用到了jute序列化协议,从输入流中读取数据,然后对数据进行反序列化,Jute序列化协议是zookeeper内部封装的一个序列化协议,然后通过特定的格式进行数据传输,从而解决传输过程当中可能出现的粘包拆包问题
OK,做个总结,今天大概梳理了一下,当前zk如果是leader时,会创建一个Leader对象,在创建Leader对象时会创建一个LeaderZookeeperServer,这个对象里面封装了一系列的RequestProcessor调用链,还有一个BasicDataTreeBuilder,其中的build方法返回一个DataTree对象。最后创建Leader对象时会创建ServerSocket然后监听端口等待其他Follower或者Observer来进行连接。所有的连接都会交给一个线程LearnerHandler进行处理,然后读取follower发送过来的数据,通过jute进行序列化反序列化
每篇文章不用太长,大家把一个点学会学透,每天进步一点点
下篇文章先讲一下Jute序列化协议是如何进行序列化反序列化的,以及它的格式是什么样的,怎么解决粘包拆包的问题的
更多推荐
所有评论(0)