zookeeper 存储基本都是在SyncRequestProcessor 单个线程完成的


1) 初始化


1.1)DataTree初始化

public DataTree() {
        /* Rather than fight it, let root have an alias */
        nodes.put("", root);
        nodes.put(rootZookeeper, root);
                                                                                                                              
        /** add the proc node and quota node */
        root.addChild(procChildZookeeper);
        nodes.put(procZookeeper, procDataNode);
                                                                                                                              
        procDataNode.addChild(quotaChildZookeeper);
        nodes.put(quotaZookeeper, quotaDataNode);
    }


会有3个节点 /、/zookeeper、/zookeeper/quota 内容为空


注意DataTree维护了两个数据结构

  • 一个是ConcurrentHashMap<String, DataNode> nodes,所有的节点都可以在这查到

  • 一个是DataNode,DataNode包含了Set<String> children,含有父子关系

也就是说通过DataNode可以遍历到子node的路径(索引),然后通过索引去nodes查到node实例


1.2)加载snapshot 和 committedlog中的事务到内存树

【详见ZKDatabase.loadDataBase--->FileTxnSnapLog.restore】

  • 首先会加载多个snapshot到内存数中

           怎么会有多个snapshot不理解?

  •  加载committedlog

           系统会产生多个committedlog,以事务的序号作为后缀名,比如1-50个事务放在log.1, 51-100放在log.51,......

           文件名中后缀序号就是文件中第一个序号


            由于snapshotcommittedlog并非完全同步,通常情况下,committedlog信息会多于snapshot,比如snapshot记录到了第80条事务,

            但committedlog 可能记录了150条事务,因此在加载的时候,就应该联合snapshot和committedlog


            如何联合?举个例子,就拿上面的例子来说,假设共有150条事务,并产生了3个日志文件,log.1,log.51,log.100

            snapshot记录到了第80条,那么就还应该需要log.51和log.100来做合并,程序会从log.51中第81条事务开始加载

            【详见TxnIterator】


            

1.3)处理过期session

【详见ZooKeeperServer.killSession-->zkDb.killSession】

zkDb.killSession为什么没有产生binlog?



1.4 ) 产生快照见ZooKeeperServer.takeSnapshot


2)运行过程

一个专门的线程SyncRequestProcessor不断处理储存请求【详见SyncRequestProcessor.run】


以一个典型产生node的场景为例

zk.create("/root", "mydata".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);


2.1)获取请求并决定是否应该flush到磁盘

if (toFlush.isEmpty()) {
                    si = queuedRequests.take();
                } else {
                    si = queuedRequests.poll();
                    if (si == null) {
                        flush(toFlush);
                        continue;
                    }
                }


如果toFlush列表中没有需要flush的请求,那么就阻塞等待请求

否则已非阻塞的方式从请求队列去请求,一旦没有请求就flush


可见zookeeper选择在空闲的时候flush,这是flush的时机之一

flush的具体功能【详见SyncRequestProcessor.flush】


toFlush 见3.0)节


2.2)线程是否应该结束

if (si == requestOfDeath) {
                    break;
                }


如果请求是特殊的请求Request.requestOfDeath,那么线程结束,典型的队列哨兵模式


2.3)写log

【详见FileTxnLog.append】

将修改类的请求写入log,类似mysql的binlog


如果写入到2.4) 否则到2.5


2.4)是否应该roll log

如果是事务型的请求,那么zks.getZKDatabase().append(si)会返回true

是否是事务型的请求?

if (zks.getZKDatabase().append(si)) {
                        logCount++;
                        if (logCount > (snapCount / 2 + randRoll)) {
                            randRoll = r.nextInt(snapCount/2);
                            // roll the log
                            zks.getZKDatabase().rollLog();
                            // take a snapshot
                            if (snapInProcess != null && snapInProcess.isAlive()) {
                                LOG.warn("Too busy to snap, skipping");
                            } else {
                                snapInProcess = new Thread("Snapshot Thread") {
                                        public void run() {
                                            try {
                                                zks.takeSnapshot();
                                            } catch(Exception e) {
                                                LOG.warn("Unexpected exception", e);
                                            }
                                        }
                                    };
                                snapInProcess.start();
                            }
                            logCount = 0;
                        }
                    }


每写入一条日志,logCount++,  加到一定数量,开始roll log,

如果此时并没有在产生快照,为了不阻塞线程,会起一个临时线程产生快照,然后将logCount清0


每次产生快照都会以snap.${Long.toHexString(dataTree.lastProcessedZxid)}来命名


2.5) 是否应该直接响应

else if (toFlush.isEmpty()) {
                       // optimization for read heavy workloads
                       // iff this is a read, and there are no pending
                       // flushes (writes), then just pass this to the next
                       // processor
                       nextProcessor.processRequest(si);
                       if (nextProcessor instanceof Flushable) {
                           ((Flushable)nextProcessor).flush();
                       }
                       continue;
                   }


查看此时toFlush(响应队列)是否为空,如果为空,说明近一段时间读多写少,直接就响应了


2.6)最后的处理

toFlush.add(si);
if (toFlush.size() > 1000) {
    flush(toFlush);
}


如果没有直接响应,那么就将请求加入toFlush(响应队列),如果此时toFlush队列长度超过1000,就flush了


到此为止,toFlush(响应队列)看上去逻辑混乱,下面会专门讲,另外flush的用途也会接下来讲



3)FAQ


3.0)toFlush到底是什么?


toFlush队列用于存储请求,可能是读也可能是写

zookeeper专门使用了一个线程SyncRequestProcessor来处理请求,所以这个线程必须合理的工作,否则就会对整体的性能造成伤害

如果都是读请求就没必要toFlush了,但如果是写请求,就必须把请求写入log,这个写入未必能保证真的同步到磁盘,但如果每次写请求都同步,

性能会有问题,所以从程序的设计应该能看到作者应该是处于这个考虑选择了两个时机来做这件事情

  • 如果没有请求的时候(即较空闲的时候)

  • toFlush队列到了一定数量(1000),就会批量同步

可以看到的一些问题

  • 由于要选择合适的时机flush,客户端的响应会受到影响,为什么不考虑分离磁盘同步和响应客户端?为了更严谨?

  • 如果写多读少,写会干扰读,因为所有的写都会加入到toFlush队列,而如果toFlush队列不为空,读也会放进去,正如上面提到的,toFlush并不会立即响应


3.1 )flush干了什么?怎么实现的?

private void flush(LinkedList<Request> toFlush) throws IOException {
        if (toFlush.isEmpty())
            return;
                                                  
        zks.getZKDatabase().commit();
        while (!toFlush.isEmpty()) {
            Request i = toFlush.remove();
            nextProcessor.processRequest(i);
        }
        if (nextProcessor instanceof Flushable) {
            ((Flushable)nextProcessor).flush();
        }
    }


  • 同步磁盘zks.getZKDatabase().commit()

  • 调用nextProcessor的方法响应网络请求,

     通常情况下nextProcessorFinalRequestProcessor且与Flushable没有关系

     所以只需关注FinalRequestProcessor.processRequest方法,下面会提到


3.2)zks.getZKDatabase().commit() 干了什么?

zks.getZKDatabase().commit()实际调用了FileTxnLog.commit,代码如下

public synchronized void commit() throws IOException {
       if (logStream != null) {
           logStream.flush();
       }
       for (FileOutputStream log : streamsToFlush) {
           log.flush();
           if (forceSync) {
               long startSyncNS = System.nanoTime();
                                                         
               log.getChannel().force(false);
                                                         
               long syncElapsedMS =
                   TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
               if (syncElapsedMS > fsyncWarningThresholdMS) {
                   LOG.warn("fsync-ing the write ahead log in "
                           + Thread.currentThread().getName()
                           + " took " + syncElapsedMS
                           + "ms which will adversely effect operation latency. "
                           + "See the ZooKeeper troubleshooting guide");
               }
           }
       }
       while (streamsToFlush.size() > 1) {
           streamsToFlush.removeFirst().close();
       }
   }



  • 将当前流logStream flush

  • 将之前append的流streamsToFlush逐个flush;forceSync默认为true,因此会调用log.getChannel().force同步到磁盘

  • 完成之后,会将streamsToFlush中的流删除

上面的flush和getChannel().force的差别?

一个是应用级一个是磁盘级,当调用flush会将应用数据缓冲区中的全部提交给磁盘驱动去调度,但此时也未必全部同步到磁盘

磁盘写一般是异步的,所以后者会保证全部同步到磁盘,类似操作系统的API fsync


3)FinalRequestProcessor 如何实现的?


3.3.1)处理outstandingChanges

synchronized (zks.outstandingChanges) {
            while (!zks.outstandingChanges.isEmpty()
                    && zks.outstandingChanges.get(0).zxid <= request.zxid) {
                ChangeRecord cr = zks.outstandingChanges.remove(0);
                if (cr.zxid < request.zxid) {
                    LOG.warn("Zxid outstanding "
                            + cr.zxid
                            + " is less than current " + request.zxid);
                }
                if (zks.outstandingChangesForPath.get(cr.path) == cr) {
                    zks.outstandingChangesForPath.remove(cr.path);
                }
            }
            if (request.hdr != null) {
               TxnHeader hdr = request.hdr;
               Record txn = request.txn;
                            
               rc = zks.processTxn(hdr, txn);
            }
            // do not add non quorum packets to the queue.
            if (Request.isQuorum(request.type)) {
                zks.getZKDatabase().addCommittedProposal(request);
            }
        }


  • outstandingChanges是一个事务型请求的冗余队列,一旦处理完相关的事物请求,需要将outstandingChanges的相关item删除

  • 处理事务型请求,主要是涉及datatree的写操作

  • 最后会将请求放入ZKDatabase的committedLog,便于集群中的其他机器快速同步

3.3.2)构造响应

接下来开始一长段的switch/case根据请求类型进行处理,基本都会构造响应

以一个事务型请求为例

case OpCode.setData: {
                lastOp = "SETD";
                rsp = new SetDataResponse(rc.stat);
                err = Code.get(rc.err);
                break;
            }


再看看一个非事务型请求的例子

case OpCode.exists: {
               lastOp = "EXIS";
               // TODO we need to figure out the security requirement for this!
               ExistsRequest existsRequest = new ExistsRequest();
               ByteBufferInputStream.byteBuffer2Record(request.request,
                       existsRequest);
               String path = existsRequest.getPath();
               if (path.indexOf('\0') != -1) {
                   throw new KeeperException.BadArgumentsException();
               }
               Stat stat = zks.getZKDatabase().statNode(path, existsRequest
                       .getWatch() ? cnxn : null);
               rsp = new ExistsResponse(stat);
               break;
           }


由于在3.1中已完成对事务型请求的处理,所以本阶段中事务型请求无需再处理

如果是非事务型需要处理,通常就是操作一下datatree获取数据


3.3.3)网络响应

ReplyHeader hdr =
            new ReplyHeader(request.cxid, request.zxid, err.intValue());
                 
        zks.serverStats().updateLatency(request.createTime);
        cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
                    request.createTime, System.currentTimeMillis());
                 
        try {
            cnxn.sendResponse(hdr, rsp, "response");
            if (closeSession) {
                cnxn.sendCloseSession();
            }
        } catch (IOException e) {
            LOG.error("FIXMSG",e);
        }


构造一个ReplyHeader,并且响应见cnxn.sendResponse


3.4)snapshot产生时机及命名规则?


3.4.1)3个时机

  • 加载的时候,用于merge快照和事务日志

  • 正如上面提到的SyncRequestProcessor线程当事务日志多于一定数量且较空闲

  • follower线程接受到leader线程相关指令

3.4.2)命名规则

public static String makeSnapshotName(long zxid) {
        return "snapshot." + Long.toHexString(zxid);
    }



4)小结

  • 服务器包含两个重要日志:快照以及事务日志

  • 服务器使用单线程来处理所有请求(和存储相关)

  • 服务器选择了合适的时机产生快照以及roll事务日志,避免阻塞

  • 和存储(写磁盘)相关的方法都是同步的(synchronized),虽然一个线程在操作

  • 如果服务器崩溃了,日志还没有flush掉,数据会丢失(至少单机的时候是这样)

  • 所有的读请求直接从内存走和磁盘无关,但响应速度会受一些影响见上面的3.0)

  • 接下来的几个问题还需跟踪

    • 为什么会同时加载多个snapshot? 1个最近的snapshot不就够了?

    • sessiontrack的机制?ZKDatabase中的sessionsWithTimeouts与sessiontrack的关系

    • 集群环境下又会有什么变化?




Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐