1、前言

又到了金三银四的时候,大家都按耐不住内心的躁动,我在这里给大家分享下之前面试中遇到的一个知识点(zookeeper应用场景),希望对大家有些帮助。如有不足,欢迎大佬们指点指点。

2、zookeeper简介

ZooKeeper 是分布式应用程序的分布式开源协调服务。它公开了一组简单的api,分布式应用程序可以基于这些api实现更高级别的同步、配置维护、分组和命名服务。它被设计为易于编程,并使用一种数据模型,该模型以熟悉的文件系统目录树结构为风格。它在 Java 中运行,并具有 Java 和 C 的绑定。

在这里插入图片描述

众所周知,协调服务很难做好。它们特别容易出现竞争条件和死锁等错误。ZooKeeper背后的动机是减轻分布式应用程序从头开始实现协调服务的负担。

在这里插入图片描述

3、zookeeper应用场景

下面的代码都需要一个序列化类,所以放在最前面声明

/**
 * @author admin
 */
public class MyZkSerializer implements ZkSerializer {

	String charset = "UTF-8";

	@Override
	public Object deserialize(byte[] bytes) throws ZkMarshallingError {
		try {
			return new String(bytes, charset);
		} catch (UnsupportedEncodingException e) {
			throw new ZkMarshallingError(e);
		}
	}

	@Override
	public byte[] serialize(Object obj) throws ZkMarshallingError {
		try {
			return String.valueOf(obj).getBytes(charset);
		} catch (UnsupportedEncodingException e) {
			throw new ZkMarshallingError(e);
		}
	}
}

3.1 配置中心

3.1.1 什么是配置中心呢?

假设咱们的项目部署在5台机子上形成一个集群,那么这5个实例在启动时读取的配置信息应该是一样的,同时一旦咱们的配置信息更改了,需要马上通知到这5个实例上并生效,这就是配置中心的功能。

在这里插入图片描述

3.1.2 zookeeper怎么实现配置中心呢?

必要条件

1、znode能存储数据
2、Watch能监听数据改变

实现方式

  1. 一个配置项对应一个zNode
// 1 将单个配置放到zookeeper上
public void putZk() {
	ZkClient client = new ZkClient("192.168.10.11:2181");
	client.setZkSerializer(new MyZkSerializer());
	String configPath = "/config1";
	String value = "1111111";
	if (client.exists(configPath)) {
		client.writeData(configPath, value);
	} else {
		client.createPersistent(configPath, value);
	}
	client.close();
}
// 需要配置的服务都从zk上取,并注册watch来实时获得配置更新
public void getConfigFromZk() {
	ZkClient client = new ZkClient("192.168.10.11:2181");
	client.setZkSerializer(new MyZkSerializer());
	String configPath = "/config1";
	String value = client.readData(configPath);
	System.out.println("从zk读到配置config1的值为:" + value);
	// 监控配置的更新,基于watch实现发布订阅功能
	client.subscribeDataChanges(configPath, new IZkDataListener() {
		@Override
		public void handleDataDeleted(String dataPath) throws Exception {
			// TODO 配置删除业务处理
		}
	
		@Override
		public void handleDataChange(String dataPath, Object data) throws Exception {
			System.out.println("获得更新的配置值:" + data);
		}
	});
	
	// 这里只是为演示实时获取到配置值更新而加的等待。实际项目应用中根据具体场景写(可用阻塞方式)
	try {
		Thread.sleep(5 * 60 * 1000);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}

}

在这里插入图片描述

  1. 一个配置文件对应一个zNode
// 将配置文件的内容存放到zk节点上
public void putConfigFile2ZK() throws IOException {

	File f = new File(this.getClass().getResource("/config.xml").getFile());
	FileInputStream fin = new FileInputStream(f);
	byte[] datas = new byte[(int) f.length()];
	fin.read(datas);
	fin.close();

	ZkClient client = new ZkClient("192.168.10.11:2181");
	client.setZkSerializer(new BytesPushThroughSerializer());
	String configPath = "/config2";
	if (client.exists(configPath)) {
		client.writeData(configPath, datas);
	} else {
		client.createPersistent(configPath, datas);
	}
	client.close();
}

获取整个配置文件的方式跟步骤1类似,只不过需要解析对应的配置文件而已。

3.2 命名服务(注册中心)

3.2.1 什么是注册中心?

注册中心主要存储注册实例应用的名称和ip地址,供其他服务通过RPC来调用,其他服务只关心你的服务名是啥,而不必关心你的服务器地址对不对,有没有上线。

在这里插入图片描述

3.2.2 zookeeper怎么实现注册中心呢?

首先是服务发现问题,当一个实例启动后会向zookeeper创建一个临时节点,并存入自己的服务信息(包括应用名和ip等),其他服务通过zookeeper拿到该实例的注册信息即可调用。

一旦该服务宕机了或者主动下线,那么该临时节点则会被删除,其他服务通过watch监听到下线通知,也就不会在去调用该服务。

3.3 Master选举

3.3.1 什么是Master选举?

在一个主从部署的集群里,一般master实例负责所有请求的读写功能,其他slave实例同步master的数据,一旦master节点不可用了,那么就需要从他的slave实例中重新选举一个节点作为master实例。

在这里插入图片描述

3.3.2 zookeeper怎么实现Master选举呢?

在这里插入图片描述

首先是实例去竞争创建临时决定(Master节点),谁创建成功谁就是master,否则就是slave。
同时所有的实例都需要去servers节点(临时节点)注册自己的服务信息,方便通过该节点获取到所有在线的实例,有点类似注册中心的意思。

下面咱们通过代码来模拟一下master选举

/**
 * @author yinfeng
 */
public class Server {

        private final String cluster;
        private final String name;
        private final String address;

        private final String path, value;

        private String master;

        public Server(String cluster, String name, String address) {
            super();
            this.cluster = cluster;
            this.name = name;
            this.address = address;
            path = "/" + this.cluster + "Master";
            value = "name:" + name + " address:" + address;

            final ZkClient client = new ZkClient("192.168.10.11:2181");
            client.setZkSerializer(new MyZkSerializer());

            final Thread thread = new Thread(() -> {
                electionMaster(client);
            });
            thread.setDaemon(true);
            thread.start();
        }
		/**
		* 选举方法
		**/
        public void electionMaster(ZkClient client) {
            try {
                client.createEphemeral(path, value);
                master = client.readData(path);
                System.out.println(value + "创建节点成功,成为Master");
            } catch (ZkNodeExistsException e) {
                master = client.readData(path);
                System.out.println("Master为:" + master);
            }

            // 为阻塞自己等待而用
            final CountDownLatch cdl = new CountDownLatch(1);

            // 注册watcher
            IZkDataListener listener = new IZkDataListener() {
                @Override
                public void handleDataDeleted(String dataPath) throws Exception {
                    System.out.println("-----监听到节点被删除");
                    cdl.countDown();
                }
                @Override
                public void handleDataChange(String dataPath, Object data) throws Exception {
                }
            };

            client.subscribeDataChanges(path, listener);

            // 让自己阻塞
            if (client.exists(path)) {
                try {
                    cdl.await();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
            // 醒来后,取消watcher
            client.unsubscribeDataChanges(path, listener);
            // 递归调自己(下一次选举)
            electionMaster(client);
        }
    }

咱们通过启动多个服务来看看是否测试成功

public static void main(String[] args) {
    // 测试时,依次开启多个Server实例java进程,然后停止获取的master的节点,看谁抢到Master
    Server s = new Server("cluster1", "server1", "192.168.1.11:8991");
    Server s1 = new Server("cluster1", "server2", "192.168.1.11:8992");
    Server s2 = new Server("cluster1", "server3", "192.168.1.11:8993");
    Server s3 = new Server("cluster1", "server4", "192.168.1.11:8994");
    try {
        Thread.sleep(100000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

在这里插入图片描述
可以看到功能一切正常

3.4 分布式队列

3.4.1 什么是分布式队列?

队列的定义是先进先出,而在分布式环境下保证先进先出的队列就是分布式队列,有点类似于消息队列。

3.4.2 zookeeper怎么实现分布式队列?

在这里插入图片描述

由上图可知,zookeeper主要通过顺序节点来保证队列的先进先出。

3.5 分布式锁

3.5.1 什么是分布式锁?

分布式锁指的是控制分布式系统不同进程共同访问共享资源的一种锁的实现。 如果在不同的系统或同一个系统的不同主机之间共享和竞争某个临界资源,往往需要互斥来防止彼此干扰,避免出现脏数据或非业务数据,保证数据一致性。

3.5.2 zookeeper通过临时节点实现布式锁?

实现原理是zookeeper节点不可重名和watch的监听通知机制,使用临时节点主要是为了避免获取锁的节点由于异常原因无法释放锁而导致出现死锁情况。

在这里插入图片描述
竞争锁流程如下图:
在这里插入图片描述
代码实现如下

/**
 * @author yinfeng
 */
public class ZKDistributeLock implements Lock {

    private String lockPath;

    private ZkClient client;

    // 锁重入计数
    private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();

    public ZKDistributeLock(String lockPath) {
        super();
        this.lockPath = lockPath;
        client = new ZkClient("192.168.10.11:2181");
        client.setZkSerializer(new MyZkSerializer());
    }

    @Override
    public boolean tryLock() { 
    	// 锁重入不会阻塞
        if (this.reentrantCount.get() != null) {
            int count = this.reentrantCount.get();
            if (count > 0) {
                this.reentrantCount.set(++count);
                return true;
            }
        }
        // 创建节点
        try {
            client.createEphemeral(lockPath);
            this.reentrantCount.set(1);
        } catch (ZkNodeExistsException e) {
            return false;
        }
        return true;
    }

    @Override
    public void unlock() {
        // 重入释进行放锁处理
        if (this.reentrantCount.get() != null) {
            int count = this.reentrantCount.get();
            if (count > 1) {
                this.reentrantCount.set(--count);
                return;
            } else {
                this.reentrantCount.set(null);
            }
        }
        client.delete(lockPath);
    }

    @Override
    public void lock() { 
        // 如果获取不到锁,阻塞等待
        if (!tryLock()) {
            // 没获得锁,阻塞自己
            waitForLock();
            // 再次尝试
            lock();
        }

    }

    private void waitForLock() {
        final CountDownLatch cdl = new CountDownLatch(1);
        IZkDataListener listener = new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("----收到节点被删除了-------------");
                cdl.countDown();
            }
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
            }
        };
        client.subscribeDataChanges(lockPath, listener);
        // 阻塞自己
        if (this.client.exists(lockPath)) {
            try {
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 取消注册
        client.unsubscribeDataChanges(lockPath, listener);
    }

    @Override
    public void lockInterruptibly() {
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) {
        return false;
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

咱们在写个测试类试一下效果,通过多线程来模拟多实例竞争锁

public static void main(String[] args) {
    // 并发数
    int currency = 50;
    // 循环屏障
    final CyclicBarrier cb = new CyclicBarrier(currency);
    // 多线程模拟高并发
    for (int i = 0; i < currency; i++) {
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "---------我准备好---------------");
            // 等待一起出发
            try {
                cb.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            ZKDistributeLock lock = new ZKDistributeLock("/distLock11");
            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName() + " 获得锁!");
                try {
                    Thread.sleep(1000 * 2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } finally {
                lock.unlock();
                System.out.println(Thread.currentThread().getName() + " 释放锁!");
            }
        }
        ).start();
    }
}

在这里插入图片描述

可以看到功能是正常的,但也有个很明显的问题,就是一旦释放锁之后所有的实例(线程)都会收到通知然后去重新竞争锁,当实例的数量达到一定程度之后,那么势必会对zookeeper造成很大的带宽和性能消耗,严重的话可能会把zookeeper集群搞挂了,这种情况也叫惊群效应,所以只通过顺序节点实现分布式锁还是有一定的问题的,下面咱们再来优化一下。

3.5.3 zookeeper通过临时顺序节点实现布式锁?

既然通过临时节点会造成惊群效应,那么咱们是否能将临时和顺序节点结合起来,通过最小的那个zNode节点来视为获得锁的标志呢?
答案是肯定能的,当释放锁时只通知他的下一个节点即可,完美的避免了惊群效应的发生。

原理图如下
在这里插入图片描述
流程图如下
在这里插入图片描述
接着咱们通过代码来实现吧

/**
 * @author yinfeng
 */
public class ZKDistributeImproveLock implements Lock {

    /**
     * 利用临时顺序节点来实现分布式锁
     * 获取锁:取排队号(创建自己的临时顺序节点),然后判断自己是否是最小号,如是,则获得锁;不是,则注册前一节点的watcher,阻塞等待
     * 释放锁:删除自己创建的临时顺序节点
     */
    private final String lockPath;

    private final ZkClient client;

    private ThreadLocal<String> currentPath = new ThreadLocal<>();

    private ThreadLocal<String> beforePath = new ThreadLocal<>();

    /**
     * 锁重入计数
     */
    private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();

    public ZKDistributeImproveLock(String lockPath) {
        super();
        this.lockPath = lockPath;
        client = new ZkClient("192.168.10.11:2181");
        client.setZkSerializer(new MyZkSerializer());
        if (!this.client.exists(lockPath)) {
            try {
                this.client.createPersistent(lockPath);
            } catch (ZkNodeExistsException ignored) {

            }
        }
    }

    @Override
    public boolean tryLock() {
    	// 重入则直接返回获得锁成功
        if (this.reentrantCount.get() != null) {
            int count = this.reentrantCount.get();
            if (count > 0) {
                this.reentrantCount.set(++count);
                return true;
            }
        }

        if (this.currentPath.get() == null) {
            currentPath.set(this.client.createEphemeralSequential(lockPath + "/", "aaa"));
        }
        // 获得所有的子节点
        List<String> children = this.client.getChildren(lockPath);

        // 排序list
        Collections.sort(children);

        // 判断当前节点是否是最小的
        if (currentPath.get().equals(lockPath + "/" + children.get(0))) {
            this.reentrantCount.set(1);
            return true;
        } else {
            // 取到前一个
            // 得到字节的索引号
            int curIndex = children.indexOf(currentPath.get().substring(lockPath.length() + 1));
            beforePath.set(lockPath + "/" + children.get(curIndex - 1));
        }
        return false;
    }

    @Override
    public void lock() {
        if (!tryLock()) {
            // 阻塞等待
            waitForLock();
            // 再次尝试加锁
            lock();
        }
    }
    
    private void waitForLock() {
        final CountDownLatch cdl = new CountDownLatch(1);
        // 注册watcher
        IZkDataListener listener = new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("-----监听到节点被删除");
                cdl.countDown();
            }

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
            }
        };

        client.subscribeDataChanges(this.beforePath.get(), listener);
        // 让自己阻塞
        if (this.client.exists(this.beforePath.get())) {
            try {
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 醒来后,取消watcher
        client.unsubscribeDataChanges(this.beforePath.get(), listener);
    }

    @Override
    public void unlock() {
        // 重入的释放锁处理
        if (this.reentrantCount.get() != null) {
            int count = this.reentrantCount.get();
            if (count > 1) {
                this.reentrantCount.set(--count);
                return;
            } else {
                this.reentrantCount.set(null);
            }
        }
        // 删除节点
        this.client.delete(this.currentPath.get());
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

最后咱们再来测试一下

public static void main(String[] args) {
    // 并发数
    int currency = 50;
    // 循环屏障
    final CyclicBarrier cb = new CyclicBarrier(currency);
    // 多线程模拟高并发
    for (int i = 0; i < currency; i++) {
        new Thread(() -> {

            System.out.println(Thread.currentThread().getName() + "---------我准备好---------------");
            // 等待一起出发
            try {
                cb.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            ZKDistributeImproveLock lock = new ZKDistributeImproveLock("/distLock");

            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName() + " 获得锁!");
                try {
                    Thread.sleep(1000 * 2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } finally {
                lock.unlock();
                System.out.println(Thread.currentThread().getName() + " 释放锁!");
            }
        }).start();
    }
}

在这里插入图片描述

可以看到功能是正常的,同时在释放锁的时候只通知了下一节点,没有出现惊群效应,非常完美。

4、总结

在微服务和分布式的时代,zookeeper作为协调服务的代表,在面试中很容易被问到,希望大家能掌握这方面的知识,提高自己的核心竞争力,在谈薪的时候拿到最高的那个区间。

最后,外出打工不易,希望各位兄弟找到自己心仪的工作,虎年发发发! 也希望兄弟们能关注、点赞、收藏、评论支持一波,非常感谢大家!

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐