1.我们先了解lead的主要任务是什么

leader作为zookeeper集群的主节点,负责响应所有对zookeeper状态变更的请求,它将每个状态请求进行编号和排序,以保证整个集群内部消息处理的FIFO。 除了这之外,还有心跳接收与处理。

2.首先看看lead处理环节
主控代码如下

case LEADING:
                    LOG.info("LEADING");
                    try {
                        setLeader(makeLeader(logFactory));
                        leader.lead();
                        setLeader(null);
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        if (leader != null) {
                            leader.shutdown("Forcing shutdown");
                            setLeader(null);
                        }
                        updateServerState();
                    }
                    break;

看到首先新建了一个leader对象,但是是否有必要每次都新建然后set呢?这里先留着疑问。leader构造器需要的参数有当前QuorumPeer对象,和一个LeaderZooKeeperServer对象。leader.lead();这段代码正式进入lead状态。进入到代码内部查看,的确看到了循环过程,所以进入到lead状态的Quorumeer,将会保持lead状态,直到遇到中断或者exception然后退出,这里解答了上面的疑问。下面我们分布看看lead代码中具体是怎样操作的。

self.end_fle = Time.currentElapsedTime();
long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);
LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
        QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;

zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);

以下代码在一个try finally块中。

self.tick.set(0);
zk.loadData();

leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

// Start thread that waits for connection requests from
// new followers.
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();

long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());

zk.setZxid(ZxidUtils.makeZxid(epoch, 0));

synchronized(this){
    lastProposed = zk.getZxid();
}

newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
       null, null);


if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
    LOG.info("NEWLEADER proposal has Zxid of "
            + Long.toHexString(newLeaderProposal.packet.getZxid()));
}

QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
QuorumVerifier curQV = self.getQuorumVerifier();
if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
    // This was added in ZOOKEEPER-1783. The initial config has version 0 (not explicitly
    // specified by the user; the lack of version in a config file is interpreted as version=0). 
    // As soon as a config is established we would like to increase its version so that it
    // takes presedence over other initial configs that were not established (such as a config
    // of a server trying to join the ensemble, which may be a partial view of the system, not the full config). 
    // We chose to set the new version to the one of the NEWLEADER message. However, before we can do that
    // there must be agreement on the new version, so we can only change the version when sending/receiving UPTODATE,
    // not when sending/receiving NEWLEADER. In other words, we can't change curQV here since its the committed quorum verifier, 
    // and there's still no agreement on the new version that we'd like to use. Instead, we use 
    // lastSeenQuorumVerifier which is being sent with NEWLEADER message
    // so its a good way to let followers know about the new version. (The original reason for sending 
    // lastSeenQuorumVerifier with NEWLEADER is so that the leader completes any potentially uncommitted reconfigs
    // that it finds before starting to propose operations. Here we're reusing the same code path for 
    // reaching consensus on the new version number.)

    // It is important that this is done before the leader executes waitForEpochAck,
    // so before LearnerHandlers return from their waitForEpochAck
    // hence before they construct the NEWLEADER message containing
    // the last-seen-quorumverifier of the leader, which we change below
   try {
       QuorumVerifier newQV = self.configFromString(curQV.toString());
       newQV.setVersion(zk.getZxid());
       self.setLastSeenQuorumVerifier(newQV, true);    
   } catch (Exception e) {
       throw new IOException(e);
   }
}

newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()){
   newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}

// We have to get at least a majority of servers in sync with
// us. We do this by waiting for the NEWLEADER packet to get
// acknowledged

 waitForEpochAck(self.getId(), leaderStateSummary);
 self.setCurrentEpoch(epoch);    

 try {
     waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
 } catch (InterruptedException e) {
     shutdown("Waiting for a quorum of followers, only synced with sids: [ "
             + newLeaderProposal.ackSetsToString() + " ]");
     HashSet<Long> followerSet = new HashSet<Long>();

     for(LearnerHandler f : getLearners()) {
         if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
             followerSet.add(f.getSid());
         }
     }    
     boolean initTicksShouldBeIncreased = true;
     for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs) {
         if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
             initTicksShouldBeIncreased = false;
             break;
         }
     }                  
     if (initTicksShouldBeIncreased) {
         LOG.warn("Enough followers present. "+
                 "Perhaps the initTicks need to be increased.");
     }
     return;
 }

 startZkServer();

/**
 * WARNING: do not use this for anything other than QA testing
 * on a real cluster. Specifically to enable verification that quorum
 * can handle the lower 32bit roll-over issue identified in
 * ZOOKEEPER-1277. Without this option it would take a very long
 * time (on order of a month say) to see the 4 billion writes
 * necessary to cause the roll-over to occur.
 *
 * This field allows you to override the zxid of the server. Typically
 * you'll want to set it to something like 0xfffffff0 and then
 * start the quorum, run some operations and see the re-election.
 */
String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
if (initialZxid != null) {
    long zxid = Long.parseLong(initialZxid);
    zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
}

if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
    self.setZooKeeperServer(zk);
}

self.adminServer.setZooKeeperServer(zk);

// Everything is a go, simply start counting the ticks
// WARNING: I couldn't find any wait statement on a synchronized
// block that would be notified by this notifyAll() call, so
// I commented it out
//synchronized (this) {
//    notifyAll();
//}
// We ping twice a tick, so we only update the tick every other
// iteration
boolean tickSkip = true;
// If not null then shutdown this leader
String shutdownMessage = null;

以上重要的部分为

zk.loadData();
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
startZkServer();
//当然中间的版本信息,同步过程等也很重要,这里不做讲述。

然后就进入了循环状态

synchronized (this) {
       long start = Time.currentElapsedTime();
       long cur = start;
       long end = start + self.tickTime / 2;
       while (cur < end) {
           wait(end - cur);
           cur = Time.currentElapsedTime();
       }

       if (!tickSkip) {
           self.tick.incrementAndGet();
       }

       // We use an instance of SyncedLearnerTracker to
       // track synced learners to make sure we still have a
       // quorum of current (and potentially next pending) view.
       SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
       syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
       if (self.getLastSeenQuorumVerifier() != null
               && self.getLastSeenQuorumVerifier().getVersion() > self
                       .getQuorumVerifier().getVersion()) {
           syncedAckSet.addQuorumVerifier(self
                   .getLastSeenQuorumVerifier());
       }

       syncedAckSet.addAck(self.getId());

       for (LearnerHandler f : getLearners()) {
           if (f.synced()) {
               syncedAckSet.addAck(f.getSid());
           }
       }

       // check leader running status
       if (!this.isRunning()) {
           // set shutdown flag
           shutdownMessage = "Unexpected internal error";
           break;
       }

       if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
           // Lost quorum of last committed and/or last proposed
           // config, set shutdown flag
           shutdownMessage = "Not sufficient followers synced, only synced with sids: [ "
                   + syncedAckSet.ackSetsToString() + " ]";
           break;
       }
       tickSkip = !tickSkip;
   }
   for (LearnerHandler f : getLearners()) {
       f.ping();
   }
}

这段循环里所表现出来的就是心跳过程。我们再看看cnxAcceptor = new LearnerCnxAcceptor();这个的功能是干什么的。

try {
   while (!stop) {
       try{
           Socket s = ss.accept();
           // start with the initLimit, once the ack is processed
           // in LearnerHandler switch to the syncLimit
           s.setSoTimeout(self.tickTime * self.initLimit);
           s.setTcpNoDelay(nodelay);
           LearnerHandler fh = new LearnerHandler(s, Leader.this);
           fh.start();
       } catch (SocketException e) {
           if (stop) {
               LOG.info("exception while shutting down acceptor: "
                       + e);

               // When Leader.shutdown() calls ss.close(),
               // the call to accept throws an exception.
               // We catch and set stop to true.
               stop = true;
           } else {
               throw e;
           }
       }
   }
} catch (Exception e) {
   LOG.warn("Exception while accepting follower", e.getMessage());
   handleException(this.getName(), e);
}
}

可以看到这个就是开启了ServerSocket服务,然后对每个连接启用一个handler线程去处理。我们再看看LearnerHandler这个类的run方法是怎样的

try {
        tickOfNextAckDeadline = leader.self.tick.get()
                + leader.self.initLimit + leader.self.syncLimit;

        ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock
                .getInputStream()));
        bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
        oa = BinaryOutputArchive.getArchive(bufferedOutput);

        QuorumPacket qp = new QuorumPacket();
        ia.readRecord(qp, "packet");
        if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
            LOG.error("First packet " + qp.toString()
                    + " is not FOLLOWERINFO or OBSERVERINFO!");
            return;
        }

        byte learnerInfoData[] = qp.getData();
        if (learnerInfoData != null) {
            ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
            if (learnerInfoData.length >= 8) {
                this.sid = bbsid.getLong();
            }
            if (learnerInfoData.length >= 12) {
                this.version = bbsid.getInt(); // protocolVersion
            }
            if (learnerInfoData.length >= 20) {
                long configVersion = bbsid.getLong();
                if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
                    throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
                }
            }
        } else {
            this.sid = leader.followerCounter.getAndDecrement();
        }

        if (leader.self.getView().containsKey(this.sid)) {
            LOG.info("Follower sid: " + this.sid + " : info : "
                    + leader.self.getView().get(this.sid).toString());
        } else {
            LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
        }

        if (qp.getType() == Leader.OBSERVERINFO) {
              learnerType = LearnerType.OBSERVER;
        }

        long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());

        long peerLastZxid;
        StateSummary ss = null;
        long zxid = qp.getZxid();
        long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
        long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);

        if (this.getVersion() < 0x10000) {
            // we are going to have to extrapolate the epoch information
            long epoch = ZxidUtils.getEpochFromZxid(zxid);
            ss = new StateSummary(epoch, zxid);
            // fake the message
            leader.waitForEpochAck(this.getSid(), ss);
        } else {
            byte ver[] = new byte[4];
            ByteBuffer.wrap(ver).putInt(0x10000);
            QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
            oa.writeRecord(newEpochPacket, "packet");
            bufferedOutput.flush();
            QuorumPacket ackEpochPacket = new QuorumPacket();
            ia.readRecord(ackEpochPacket, "packet");
            if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                LOG.error(ackEpochPacket.toString()
                        + " is not ACKEPOCH");
                return;
}
            ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
            ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
            leader.waitForEpochAck(this.getSid(), ss);
        }
        peerLastZxid = ss.getLastZxid();

        // Take any necessary action if we need to send TRUNC or DIFF
        // startForwarding() will be called in all cases
        boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);

        LOG.debug("Sending NEWLEADER message to " + sid);
        // the version of this quorumVerifier will be set by leader.lead() in case
        // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
        // we got here, so the version was set
        if (getVersion() < 0x10000) {
            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                    newLeaderZxid, null, null);
            oa.writeRecord(newLeaderQP, "packet");
        } else {
            QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                    newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
                            .toString().getBytes(), null);
            queuedPackets.add(newLeaderQP);
        }
        bufferedOutput.flush();

        /* if we are not truncating or sending a diff just send a snapshot */
        if (needSnap) {
            boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
            LearnerSnapshot snapshot = 
                    leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
            try {
                long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
                oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                bufferedOutput.flush();

                LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
                        + "send zxid of db as 0x{}, {} concurrent snapshots, " 
                        + "snapshot was {} from throttle",
                        Long.toHexString(peerLastZxid), 
                        Long.toHexString(leaderLastZxid),
                        Long.toHexString(zxidToSend), 
                        snapshot.getConcurrentSnapshotNumber(),
                        snapshot.isEssential() ? "exempt" : "not exempt");
                // Dump data to peer
                leader.zk.getZKDatabase().serializeSnapshot(oa);
                oa.writeString("BenWasHere", "signature");
                bufferedOutput.flush();
            } finally {
                snapshot.close();
            }
        }

        // Start thread that blast packets in the queue to learner
        startSendingPackets();

        /*
         * Have to wait for the first ACK, wait until
         * the leader is ready, and only then we can
         * start processing messages.
         */
        qp = new QuorumPacket();
        ia.readRecord(qp, "packet");
        if(qp.getType() != Leader.ACK){
            LOG.error("Next packet was supposed to be an ACK,"
                + " but received packet: {}", packetToString(qp));
            return;
        }

        if(LOG.isDebugEnabled()){
            LOG.debug("Received NEWLEADER-ACK message from " + sid);   
        }
        leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());

        syncLimitCheck.start();

        // now that the ack has been processed expect the syncLimit
        sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);

        /*
         * Wait until leader starts up
         */
        synchronized(leader.zk){
            while(!leader.zk.isRunning() && !this.isInterrupted()){
                leader.zk.wait(20);
            }
        }
        // Mutation packets will be queued during the serialize,
        // so we need to mark when the peer can actually start
        // using the data
        //
        LOG.debug("Sending UPTODATE message to " + sid);      
        queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

        while (true) {
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");

            long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
            if (qp.getType() == Leader.PING) {
                traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
            }
            if (LOG.isTraceEnabled()) {
                ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
            }
            tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;


            ByteBuffer bb;
            long sessionId;
            int cxid;
            int type;

            switch (qp.getType()) {
            case Leader.ACK:
                if (this.learnerType == LearnerType.OBSERVER) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Received ACK from Observer  " + this.sid);
                    }
                }
                syncLimitCheck.updateAck(qp.getZxid());
                leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                break;
            case Leader.PING:
                // Process the touches
                ByteArrayInputStream bis = new ByteArrayInputStream(qp
                        .getData());
                DataInputStream dis = new DataInputStream(bis);
                while (dis.available() > 0) {
                    long sess = dis.readLong();
                    int to = dis.readInt();
                    leader.zk.touch(sess, to);
                }
                break;
            case Leader.REVALIDATE:
                bis = new ByteArrayInputStream(qp.getData());
                dis = new DataInputStream(bis);
                long id = dis.readLong();
                int to = dis.readInt();
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                DataOutputStream dos = new DataOutputStream(bos);
                dos.writeLong(id);
                boolean valid = leader.zk.checkIfValidGlobalSession(id, to);
                if (valid) {
                    try {
                        //set the session owner
                        // as the follower that
                        // owns the session
                        leader.zk.setOwner(id, this);
                    } catch (SessionExpiredException e) {
                        LOG.error("Somehow session " + Long.toHexString(id) +
                                " expired right after being renewed! (impossible)", e);
                    }
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(LOG,
                                             ZooTrace.SESSION_TRACE_MASK,
                                             "Session 0x" + Long.toHexString(id)
                                             + " is valid: "+ valid);
                }
                dos.writeBoolean(valid);
                qp.setData(bos.toByteArray());
                queuedPackets.add(qp);
                break;
            case Leader.REQUEST:
                bb = ByteBuffer.wrap(qp.getData());
                sessionId = bb.getLong();
                cxid = bb.getInt();
                type = bb.getInt();
                bb = bb.slice();
                Request si;
                if(type == OpCode.sync){
                    si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
                } else {
                    si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                }
                si.setOwner(this);
                leader.zk.submitLearnerRequest(si);
                break;
            default:
                LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
                break;
            }
        }
    } catch (IOException e) {
        if (sock != null && !sock.isClosed()) {
            LOG.error("Unexpected exception causing shutdown while sock "
                    + "still open", e);
            //close the socket to make sure the
            //other side can see it being close
            try {
                sock.close();
            } catch(IOException ie) {
                // do nothing
            }
        }
    } catch (InterruptedException e) {
        LOG.error("Unexpected exception causing shutdown", e);
    } catch (SnapshotThrottleException e) {
        LOG.error("too many concurrent snapshots: " + e);
    } finally {
        LOG.warn("******* GOODBYE "
                + (sock != null ? sock.getRemoteSocketAddress() : "<null>")
                + " ********");
        shutdown();
    }
}

上述代码首先将请求序列化成一个QuorumPacket包,然后进行参数检测与核对,可以看到整个过程处理了几次packet的接收和发送。leader在接到packet后,处理任务,然后发送处理结果,最后接收ack消息,跟tcp三次握手是不是很像!!!

3.接着我们看看lead过程主要处理的内容

看下面一段核心代码,一个请求包有几种类型REQUEST,REVALIDATE,PING,ACK。REQUEST就是处理具体的请求了,leader.zk.submitLearnerRequest(si), 进入到方法后就是 prepRequestProcessor.processRequest(request), 这个就用到了责任链模式的处理逻辑。 PS发现在这种代码中的服务,经常选用一种阻塞链表+一个线程处理服务。如果你沿着prepRequestProcessor.processRequest(request)走下去,就会发现这个,当然其它地方也是广泛使用的。
while (true) {
    qp = new QuorumPacket();
    ia.readRecord(qp, "packet");

    long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
    if (qp.getType() == Leader.PING) {
        traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
    }
    if (LOG.isTraceEnabled()) {
        ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
    }
    tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;


    ByteBuffer bb;
    long sessionId;
    int cxid;
    int type;

    switch (qp.getType()) {
    case Leader.ACK:
        if (this.learnerType == LearnerType.OBSERVER) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received ACK from Observer  " + this.sid);
            }
        }
        syncLimitCheck.updateAck(qp.getZxid());
        leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
        break;
    case Leader.PING:
        // Process the touches
        ByteArrayInputStream bis = new ByteArrayInputStream(qp
                .getData());
        DataInputStream dis = new DataInputStream(bis);
        while (dis.available() > 0) {
            long sess = dis.readLong();
            int to = dis.readInt();
            leader.zk.touch(sess, to);
        }
        break;
    case Leader.REVALIDATE:
        bis = new ByteArrayInputStream(qp.getData());
        dis = new DataInputStream(bis);
        long id = dis.readLong();
        int to = dis.readInt();
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(bos);
        dos.writeLong(id);
        boolean valid = leader.zk.checkIfValidGlobalSession(id, to);
        if (valid) {
            try {
                //set the session owner
                // as the follower that
                // owns the session
                leader.zk.setOwner(id, this);
            } catch (SessionExpiredException e) {
                LOG.error("Somehow session " + Long.toHexString(id) +
                        " expired right after being renewed! (impossible)", e);
            }
        }
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG,
                                     ZooTrace.SESSION_TRACE_MASK,
                                     "Session 0x" + Long.toHexString(id)
                                     + " is valid: "+ valid);
        }
        dos.writeBoolean(valid);
        qp.setData(bos.toByteArray());
        queuedPackets.add(qp);
        break;
    case Leader.REQUEST:
        bb = ByteBuffer.wrap(qp.getData());
        sessionId = bb.getLong();
        cxid = bb.getInt();
        type = bb.getInt();
        bb = bb.slice();
        Request si;
        if(type == OpCode.sync){
            si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
        } else {
            si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
        }
        si.setOwner(this);
        leader.zk.submitLearnerRequest(si);
        break;
    default:
        LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
        break;
    }
}
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐