源代码:

http://svn.apache.org/repos/asf/zookeeper/trunk/

 

导入eclipse:

在包含build.xml目录下执行ant eclipse将产生.classpath文件

 

目录结构:

src/recipes:提供了各种Zookeeper应用例子

src/c:提供了c版客户端。zookeeper_st,zookeeper_mt两个library

src/contrib:别人贡献的代码?

src/generated:由jute生成的java实体类

 

客户端入口:org.apache.zookeeper.ZooKeeperMain

 

//读取命令行输入,用MyCommandOptions解析。

//内部类MyCommandOptions包含成员命令名command、参数列表cmdArgs

-option value –option value command cmdArgs

 

//根据以上解析的ip、端口,连接到ZooKeeper

        zk = newZooKeeper(host,

                Integer.parseInt(cl.getOption("timeout")),

                 newMyWatcher(), readOnly);

 

//执行命令,在ZooKeeperMain.run()

//ZooKeeperMain只是一个外壳,使用jline实现了命令提示功能。

//commandMapCli将提供的命令命令名与执行体CliCommand关联

        //execute from commandMap

        CliCommandcliCmd = commandMapCli.get(cmd);

        if(cliCmd!=null) {

           cliCmd.setZk(zk);

            watch =cliCmd.parse(args).exec();

                   }

 

//最终转到调用ZooKeeper方法

//提供的命令:

quit:Zk.close()关闭zk连接,调用cnxn.close()

history:列出历史记录

redo index:重新执行历史记录

printwatches [on]:查看/设置watche开关状态

connect:connectToZK(host)连接zk


//ZooKeeper内部连接

        cnxn = newClientCnxn(connectStringParser.getChrootPath(),

                hostProvider,sessionTimeout,this,watchManager,

                getClientCnxnSocket(),canBeReadOnly);

        cnxn.start();

 

 

ClientCnxn包含SendThread和EventThread两个线程

SendThread将事件添加到waitEvents队列中,EventThread线程消费该队列。


//下面以ls命令为例

//调用zk.getChildren

    public boolean exec() throwsKeeperException, InterruptedException {

        String path= args[1];

        boolean watch =cl.hasOption("w");

       List<String> children = zk.getChildren(path, watch);

        out.println(children);

        return watch;

    }


//getChildren生成request

       RequestHeader h = newRequestHeader();

       h.setType(ZooDefs.OpCode.getChildren);

       GetChildrenRequest request = newGetChildrenRequest();

       request.setPath(serverPath);

       request.setWatch(watcher != null);

       GetChildrenResponse response = newGetChildrenResponse();

        ReplyHeader r = cnxn.submitRequest(h, request,response, wcb);

 

 

//submitRequest调用queuePacket,阻塞直到收到response。

    publicReplyHeadersubmitRequest(RequestHeaderh, Record request,

            Recordresponse, WatchRegistration watchRegistration)

            throwsInterruptedException {

        ReplyHeaderr = new ReplyHeader();

        Packetpacket = queuePacket(h,r, request, response,null,null,null,

                   null, watchRegistration);

        synchronized(packet) {

            while (!packet.finished) {

               packet.wait();

            }

        }

        return r;

    }


//queuePacket将Packet添加到outgoingQueue队列中

            packet= new Packet(h, r, request, response,watchRegistration);

            packet.cb = cb;

            packet.ctx = ctx;

            packet.clientPath =clientPath;

            packet.serverPath =serverPath;

 

                outgoingQueue.add(packet);

 

         //然后唤醒selector

        sendThread.getClientCnxnSocket().wakeupCnxn();



//sendThread.run消费outgoingQueue

        clientCnxnSocket.doTransport(to,pendingQueue,outgoingQueue,ClientCnxn.this);

 

 

//selector判断读/写事件

//doTransport调用doIO,doIO解析Response

         //读事件

        int rc =sock.read(incomingBuffer);

        sendThread.readResponse(incomingBuffer);

         //写事件

       sock.write(p.bb);


//readResponse

//收到的package包括header,token, response字段

// 当header中xid是-1时,收到的notification,response字段解析得到WatchedEvent

// event将放入eventThread事件队列watingEvents

     WatcherEventevent = new WatcherEvent();

    event.deserialize(bbia, "response");

     …

     eventThread.queueEvent(we );

//在finally块中调用finishPacket

    try {

        packet.replyHeader.setXid(replyHdr.getXid());

        packet.replyHeader.setErr(replyHdr.getErr());

        packet.replyHeader.setZxid(replyHdr.getZxid());

        if(replyHdr.getZxid() > 0) {

            lastZxid =replyHdr.getZxid();

        }

        if(packet.response != null&& replyHdr.getErr() == 0) {

            packet.response.deserialize(bbia, "response");

        }

    } finally {

        finishPacket(packet);

    }

 

 

//finishPacket

//如果有回调,finishPacket也将packet放到eventThread的队列中

//否则设置packet.finish,此时submitRequest返回response。

    if (p.cb == null) {

        synchronized (p) {

            p.finished = true;

           p.notifyAll();

        }

    } else {

        p.finished = true;

        eventThread.queuePacket(p);

    }

 

 

//EventThread.run线程消费waintingEvents

Objectevent = waitingEvents.take();

processEvent(event);

 



以下图片转自:http://www.spnguru.com/2010/08/zookeeper%E5%85%A8%E8%A7%A3%E6%9E%90%E2%80%94%E2%80%94client%E7%AB%AF/

 


Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐