应用场景

分布式系统最典型的架构就是 一主多从 。在很多时候,虽然处理大规模的数据、图像和文件等,这种工作极其耗资源而且数据、文件等都是共享的,若全部机器都计算处理一次会浪费保贵的计算资源;我们可以把这些工作交给一台机器处理,其它机器则通过数据库、分布式文件系统等方式共享计算成果Leader(Master)。另外,对于数据库、缓存等组件读写分离是惯用的提高性能的方式;读写分离是把写全部给leader(master),查询则使用follower的机器。使用Zookeeper提供的API可轻松实现leader选举。

具体步骤

1、客户端连接时,在指定的目录(这里假定为"/leader")创建一个 EPHEMERAL_SEQUENTIAL 的节点,把内网的IP数据存入创建节点。
2、获取目录的子点节,并取得序列号最小的节点,我们把这个节点设置为leader。当此节点被删除时,证明leader断线。
3、其它机器监听leader节点,当leader节点的删除时,再取目录的最小子点节作为leader。


详细代码

zk客户端工具类
package org.massive.common;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * Created by Massive on 2016/12/18.
 */
public class ZooKeeperClient {

    private static String connectionString = "localhost:2181";
    private static int sessionTimeout = 10000;


    public static ZooKeeper getInstance() throws IOException, InterruptedException {
        //--------------------------------------------------------------
        // 为避免连接还未完成就执行zookeeper的get/create/exists操作引起的(KeeperErrorCode = ConnectionLoss)
        // 这里等Zookeeper的连接完成才返回实例
        //--------------------------------------------------------------
        final CountDownLatch connectedSignal = new CountDownLatch(1);
        ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    connectedSignal.countDown();
                }
            }
        });
        connectedSignal.await(sessionTimeout, TimeUnit.MILLISECONDS);
        return zk;
    }

    public static int getSessionTimeout() {
        return sessionTimeout;
    }

    public static void setSessionTimeout(int sessionTimeout) {
        ZooKeeperClient.sessionTimeout = sessionTimeout;
    }

}

LeaderElection
package org.massive.group;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.massive.common.ZooKeeperClient;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.*;
import java.util.*;
import java.util.concurrent.CountDownLatch;

/**
 * Created by Massive on 2016/12/25.
 */
public class LeaderElection {

    private ZooKeeper zk;
    private int sessionTimeout;

    private static byte[] DEFAULT_DATA = {0x12,0x34};
    private static Object mutex = new Object();

    private static String ROOT = "/leader";

    private byte[] localhost = getLocalIpAdressBytes();

    private String znode;

    private static CountDownLatch firstElectionSignal = new CountDownLatch(1);

    //----------------------------------------------------
    // leader的IP地址
    //----------------------------------------------------
    private static String leader;

    public LeaderElection() throws IOException, InterruptedException, KeeperException {
        this.zk = ZooKeeperClient.getInstance();
        this.sessionTimeout = zk.getSessionTimeout();
        ensureExists(ROOT);
        ensureLocalNodeExists();

        System.out.println("-------------------------------------");
        System.out.println("local IP: " + getLocalIpAddress());
        System.out.println("local created node: " + znode);
        System.out.println("-------------------------------------");


    }

    /**
     * 检查本机是否已创建节点,不存在则创建
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void ensureLocalNodeExists() throws KeeperException, InterruptedException {
        List<String> list = zk.getChildren(ROOT,new NodeDeleteWatcher());
        for (String node : list) {
            Stat stat = new Stat();
            String path = ROOT + "/" + node;
            byte[] data = zk.getData(path,false,stat);

            if (Arrays.equals(data,localhost)) {
                znode = path;
                return;
            }
        }
        znode = zk.create(ROOT + "/", localhost, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);

    }

    public void ensureExists(String path) {
        try {
            Stat stat = zk.exists(path, false);
            if (stat == null) {
                zk.create(path, DEFAULT_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void start() throws KeeperException, InterruptedException, UnsupportedEncodingException {

        do{
            synchronized (mutex) {

                System.out.println("begin leader election...");

                List<String> nodes = zk.getChildren(ROOT, null);
                SortedSet<String> sortedNode = new TreeSet<String>();
                for (String node : nodes) {
                    sortedNode.add(ROOT + "/" + node);
                }
                //----------------------------------------------------
                // 取出序列号最小的消息
                //----------------------------------------------------
                String first = sortedNode.first();
                leader = first;
                //----------------------------------------------------
                // 监控序列最小节点(非本机创建节点)
                //----------------------------------------------------
                NodeDeleteWatcher watcher = znode.equals(first) ? null : new NodeDeleteWatcher();
                byte[] data = zk.getData(first, watcher, null);
                leader = new String(data, "UTF-8");

                System.out.println("leader election end, the leader is : " + leader);

                if (firstElectionSignal.getCount() != 0) {
                    firstElectionSignal.countDown();
                }

                if (znode.equals(first)) {
                    return;
                }

                mutex.wait();
            }
        } while (true);


    }

    class NodeDeleteWatcher implements Watcher {
        @Override
        public void process(WatchedEvent event) {
            if (event.getType() == Event.EventType.NodeDeleted) {
                synchronized (mutex) {
                    mutex.notify();
                }
            }
        }
    }

    /**
     * 获取本地内网IP地址
     * @return
     * @throws SocketException
     */
    public static String getLocalIpAddress() throws SocketException {
        // 获得本机的所有网络接口
        Enumeration<NetworkInterface> nifs = NetworkInterface.getNetworkInterfaces();

        while (nifs.hasMoreElements()) {
            NetworkInterface nif = nifs.nextElement();

            // 获得与该网络接口绑定的 IP 地址,一般只有一个
            Enumeration<InetAddress> addresses = nif.getInetAddresses();
            while (addresses.hasMoreElements()) {
                InetAddress addr = addresses.nextElement();

                String ip = addr.getHostAddress();
                // 只关心 IPv4 地址
                if (addr instanceof Inet4Address && !"127.0.0.1".equals(ip)) {
                    return ip;
                }
            }
        }
        return null;
    }

    public static byte[] getLocalIpAdressBytes() throws SocketException {
        String ip = getLocalIpAddress();
        return ip == null ? null : ip.getBytes();
    }

    public static String getLeader() throws InterruptedException {
        //----------------------------------------------------
        // 第一次leader选举完成后才释放阀门
        //----------------------------------------------------
        firstElectionSignal.await();
        return leader;
    }

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        //-------------------------------------------------------
        // 启动一条线程用处理leader选举
        //-------------------------------------------------------
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    LeaderElection leaderElection = new LeaderElection();
                    leaderElection.start();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        String leader = LeaderElection.getLeader();

    }
}


测试

运行main方法,本机器的某次输出结果
-------------------------------------
local IP: 192.168.1.103
local created node: /leader/0000000017
-------------------------------------
begin leader election...
leader election end, the leader is : 192.168.1.103

注:本文只进行了简单的测试,要在生产环境中使用请先同时多在个JVM上进行测试。


ZooKeeper实现分布式锁和队列可参考:



Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐