实现原理

先进先出队列是最常用的队列,使用Zookeeper实现先进先出队列就是在特定的目录下创建 PERSISTENT_EQUENTIAL 节点,创建成功时 Watcher 通知等待的队列,队列删除序列号最小的节点用以消费。此场景下Zookeeper的znode用于消息存储,znode存储的数据就是消息队列中的消息内容,SEQUENTIAL序列号就是消息的编号,按序取出即可。由于创建的节点是持久化的,所以不必担心队列消息的丢失问题。


应用场景

Zookeeper队列不太适合要求高性能的场合,但可以在数据量不大的情况下考虑使用。比如已在项目中使用Zookeeper又需要小规模的队列应用,这时可以使用Zookeeper实现的队列;毕竟引进一个消息中间件会增加系统的复杂性和运维的压力。


详细代码

ZookeeperClient工具类

package org.massive.common;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * Created by Massive on 2016/12/18.
 */
public class ZooKeeperClient {

    private static String connectionString = "localhost:2181";
    private static int sessionTimeout = 10000;


    public static ZooKeeper getInstance() throws IOException, InterruptedException {
        //--------------------------------------------------------------
        // 为避免连接还未完成就执行zookeeper的get/create/exists操作引起的(KeeperErrorCode = ConnectionLoss)
        // 这里等Zookeeper的连接完成才返回实例
        //--------------------------------------------------------------
        final CountDownLatch connectedSignal = new CountDownLatch(1);
        ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
             @Override
             public void process(WatchedEvent event) {
                 if  (event.getState()  ==  Event.KeeperState.SyncConnected) {
                     connectedSignal.countDown();
                 } else if (event.getState() == Event.KeeperState.Expired) {

                 }
             }
         });
        connectedSignal.await(sessionTimeout, TimeUnit.MILLISECONDS);
        return zk;
    }

    public static int getSessionTimeout() {
        return sessionTimeout;
    }

    public static void setSessionTimeout(int sessionTimeout) {
        ZooKeeperClient.sessionTimeout = sessionTimeout;
    }

}
ZooKeeperQueue
package org.massive.queue;

import org.apache.commons.lang3.RandomUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.massive.common.ZooKeeperClient;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;

/**
 * Created by Allen on 2016/12/22.
 */
public class ZooKeeperQueue {

    private ZooKeeper zk;
    private int sessionTimeout;

    private static byte[] ROOT_QUEUE_DATA = {0x12,0x34};
    private static String QUEUE_ROOT = "/QUEUE";
    private String queueName;
    private String queuePath;
    private Object mutex = new Object();

    public ZooKeeperQueue(String queueName) throws IOException, KeeperException, InterruptedException {
        this.queueName = queueName;
        this.queuePath = QUEUE_ROOT + "/" + queueName;
        this.zk = ZooKeeperClient.getInstance();
        this.sessionTimeout = zk.getSessionTimeout();

        //----------------------------------------------------
        // 确保队列根目录/QUEUE和当前队列的目录的存在
        //----------------------------------------------------
        ensureExists(QUEUE_ROOT);
        ensureExists(queuePath);
    }

    public byte[] consume() throws InterruptedException, KeeperException, UnsupportedEncodingException {
        List<String> nodes = null;
        byte[] returnVal = null;
        Stat stat = null;
        do {
            synchronized (mutex) {

                nodes = zk.getChildren(queuePath, new ProduceWatcher());

                //----------------------------------------------------
                // 如果没有消息节点,等待生产者的通知
                //----------------------------------------------------
                if (nodes == null || nodes.size() == 0) {
                    mutex.wait();
                } else {

                    SortedSet<String> sortedNode = new TreeSet<String>();
                    for (String node : nodes) {
                        sortedNode.add(queuePath + "/" + node);
                    }

                    //----------------------------------------------------
                    // 消费队列里序列号最小的消息
                    //----------------------------------------------------
                    String first = sortedNode.first();
                    returnVal = zk.getData(first, false, stat);
                    zk.delete(first, -1);

                    System.out.print(Thread.currentThread().getName() + " ");
                    System.out.print("consume a message from queue:" + first);
                    System.out.println(", message data is: " + new String(returnVal,"UTF-8"));

                    return returnVal;
                }
            }
        } while (true);
    }

    class ProduceWatcher implements Watcher {
        @Override
        public void process(WatchedEvent event) {
            //----------------------------------------------------
            // 生产一条消息成功后通知一个等待线程
            //----------------------------------------------------
            synchronized (mutex) {
                mutex.notify();
            }
        }
    }

    public void produce(byte[] data) throws KeeperException, InterruptedException, UnsupportedEncodingException {

        //----------------------------------------------------
        // 确保当前队列目录存在
        // example: /QUEUE/queueName
        //----------------------------------------------------
        ensureExists(queuePath);

        String node = zk.create(queuePath + "/", data,
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT_SEQUENTIAL);


        System.out.print(Thread.currentThread().getName() + " ");
        System.out.print("produce a message to queue:" + node);
        System.out.println(" , message data is: " + new String(data,"UTF-8"));
    }


    public void ensureExists(String path) {
        try {
            Stat stat = zk.exists(path, false);
            if (stat == null) {
                zk.create(path, ROOT_QUEUE_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }



    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        String queueName = "test";
        final ZooKeeperQueue queue = new ZooKeeperQueue(queueName);

        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        queue.consume();
                        System.out.println("--------------------------------------------------------");
                        System.out.println();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    try {
                        Thread.sleep(RandomUtils.nextInt(100 * i, 200 * i));
                        queue.produce(("massive" + i).getBytes());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
            }
        },"Produce-thread").start();


    }

}


测试

运行main方法,本机器的某次输出结果
Produce-thread produce a message to queue:/QUEUE/test/0000000000 , message data is: massive0
Thread-8 consume a message from queue:/QUEUE/test/0000000000, message data is: massive0
--------------------------------------------------------

Produce-thread produce a message to queue:/QUEUE/test/0000000001 , message data is: massive1
Thread-6 consume a message from queue:/QUEUE/test/0000000001, message data is: massive1
--------------------------------------------------------

Produce-thread produce a message to queue:/QUEUE/test/0000000002 , message data is: massive2
Thread-3 consume a message from queue:/QUEUE/test/0000000002, message data is: massive2
--------------------------------------------------------

Produce-thread produce a message to queue:/QUEUE/test/0000000003 , message data is: massive3
Thread-0 consume a message from queue:/QUEUE/test/0000000003, message data is: massive3
--------------------------------------------------------

Produce-thread produce a message to queue:/QUEUE/test/0000000004 , message data is: massive4
Thread-5 consume a message from queue:/QUEUE/test/0000000004, message data is: massive4
--------------------------------------------------------

Produce-thread produce a message to queue:/QUEUE/test/0000000005 , message data is: massive5
Thread-2 consume a message from queue:/QUEUE/test/0000000005, message data is: massive5
--------------------------------------------------------

Produce-thread produce a message to queue:/QUEUE/test/0000000006 , message data is: massive6
Thread-4 consume a message from queue:/QUEUE/test/0000000006, message data is: massive6
--------------------------------------------------------

Produce-thread produce a message to queue:/QUEUE/test/0000000007 , message data is: massive7
Thread-9 consume a message from queue:/QUEUE/test/0000000007, message data is: massive7
--------------------------------------------------------

Produce-thread produce a message to queue:/QUEUE/test/0000000008 , message data is: massive8
Thread-7 consume a message from queue:/QUEUE/test/0000000008, message data is: massive8
--------------------------------------------------------

Produce-thread produce a message to queue:/QUEUE/test/0000000009 , message data is: massive9
Thread-1 consume a message from queue:/QUEUE/test/0000000009, message data is: massive9
--------------------------------------------------------

参考文章:  

本文为原创,转载请注明出处http://blog.csdn.net/massivestars/article/details/53870399


有关ZooKeeper实现分布式锁可参考:使用Zookeeper实现分布式锁


Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐