并发编程栏目代码 GitHub package 地址: 点击打开链接

博客并发编程栏目 : 点击打开链接



51CTO 译文框架介绍: http://developer.51cto.com/art/201306/399370.htm




#Disruptor简单说明

Disruptor 的源码非常精简,没有任何配置文件,所有源文件类加起来也就 58 个(不同版本可能不一样),
用代码行统计工具算了下,一共 6306 行。
对于一个能做到如此成功的开源工具来说,能有这么精短的代码量,确实很不错。


##Disruptor 代码共分为四个包:


1). com.lmax.disruptor: 大部分文件存放于这个目录下,包括 Disruptor 中重要的类文件,
    包括:EventProcessor、RingBuffer、Sequence、Sequencer、WaitStrategy 等
2). com.lmax.disruptor.collections: 该目录下只有一个类:Histogram,
    它不是 Disruptor 运行的必须类,其实我也没用过它,从源码注释来看,
    该类的作用是,在一个对性能要求很高的、有多个消费者的系统中,Histogram 可以用来记录系统耗各个组件的耗时情况,
    并以直方图的形式展示出来。初学 Disruptor 可以不用管关心它。
3). com.lmax.disruptor.dsl: 该包中保存了消费者和生产者的一些信息,核心类文件 Disruptor 也存放在该目录下。
4). com.lmax.disruptor.util: 该包中存放了几个辅助操作类,
    如 Util 类,DaemonThreadFactory 类,PaddedLong 类,该类用来做缓冲行填充的。


简单实现生产者消费者模型

其实所有的事件处理无非都是生产者消费者模型的


事件对象 - 商品对象


/**
 * Created by lw on 14-7-3.
 * <p>
 * 事件对象 - 商品对象
 * <p>
 * 定义 ValueEvent 类,该类作为填充 RingBuffer 的消息,
 * 生产者向该消息中填充数据(就是修改 value 属性值,后文用生产消息代替),
 * 消费者从消息体中获取数据(获取 value 值,后文用消费消息代替)
 *
 * @see com.thread.concurrent_.disruptor.DeliveryReportEventHandler
 */
public final class ValueEvent {

    private String value;//模拟任务数据

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    //定义生成的事件对象,注册到创建 Disruptor 对象
    public final static EventFactory<ValueEvent> EVENT_FACTORY
            = new EventFactory<ValueEvent>() {
        public ValueEvent newInstance() {
            return new ValueEvent();
        }
    };
}



事件处理者 - 消费者


/**
 * 事件处理者 - 消费者
 * @author lw by 14-7-22.
 */
public class DeliveryReportEventHandler implements EventHandler<ValueEvent> {

    private int id;//消费者编号

    public DeliveryReportEventHandler(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "DeliveryReportEventHandler{" +
                "id=" + id +
                '}';
    }

    /**
     * @param event  事件
     * @param sequence   事件正在处理
     * @param endOfBatch 是否是最后一个事件在处理
     * @throws Exception Exception
     */
    @Override
    public void onEvent(ValueEvent event, long sequence, boolean endOfBatch) throws Exception {
        Thread.sleep(2000);
        System.out.println(this + "\tevent:\t" + event.getValue()
                + "\tsequence:\t" + sequence
                + "\tendOfBatch:\t" + endOfBatch);
    }
}



生产者 Disruptor - 核心内容


/**
 * 生产者 Disruptor - 核心内容
 *
 * @author lw by 14-7-2.
 */
public class Disruptor_Example {

    private static final int RINGBUFFER_SIZE = 16;//这个参数应该是2的幂,否则程序会抛出异常:
    private static RingBuffer<ValueEvent> ringBuffer;//定义环形数组内存
    private static Disruptor<ValueEvent> disruptor;
    private static final ExecutorService SERVICE//线程池
            = Executors.newCachedThreadPool();

    /**
     * 创建 Disruptor 对象。
     * Disruptor 类是 Disruptor 项目的核心类,另一个核心类之一是 RingBuffer。
     * 如果把 Disruptor 比作计算机的 cpu ,作为调度中心的话,那么 RingBuffer ,就是计算机的 Memory 。
     * 第一个参数,是一个 EventFactory 对象,它负责创建 ValueEvent 对象,并填充到 RingBuffer 中;
     * 第二个参数,指定 RingBuffer 的大小。这个参数应该是2的幂,否则程序会抛出异常:
     * 第三个参数,就是之前创建的 ExecutorService 对象。
     */
    private static void init() {
        disruptor = new Disruptor<ValueEvent>(
                ValueEvent.EVENT_FACTORY,
                RINGBUFFER_SIZE,
                SERVICE,
                ProducerType.MULTI,
                new TimeoutBlockingWaitStrategy(1000, TimeUnit.MINUTES)
        );
    }

    /**
     * 添加消费者对象
     * {@link com.thread.concurrent_.disruptor.DeliveryReportEventHandler}
     *
     * @param eventHandlers 消费者对象
     */
    private static void handleEventsWith(EventHandler[] eventHandlers) {
        disruptor.handleEventsWith(eventHandlers);
    }

    /**
     * 启动disruptor
     */
    private static void start() {
        ringBuffer = disruptor.start();
    }

    /**
     * 生产者线程
     * 通过 next 方法,获取 RingBuffer 可写入的消息索引号 seq;
     * 通过 seq 检索消息;
     * 修改消息的 value 属性;
     * 通过 publish 方法,告知消费者线程,当前索引位置的消息可被消费了
     *
     * @param event 事件
     */
    private static void addEVent(ValueEvent event) {

        if (hasCapacity()) {
            System.out.println("disruptor:ringbuffer 剩余量低于 10 %");
        } else {
            long seq = ringBuffer.next();
            /**
             * @see com.thread.concurrent_.disruptor.ValueEvent.<com.thread.concurrent_.disruptor.DeliveryReportEventHandler>
             */
            ValueEvent valueEvent = ringBuffer.get(seq);//获取可用位置
            valueEvent.setValue(event.getValue());//填充可用位置
            ringBuffer.publish(seq);//通知消费者
        }
    }

    /**
     * 停止 Disruptor系统(停止消费者线程)
     */
    private static void shutdown() {
        disruptor.shutdown();
        SERVICE.shutdown();
    }

    /**
     * 获取ringBuffer剩余量是否低于RINGBUFFER_SIZE * 0.1
     *
     * @return boolean
     */
    private static boolean hasCapacity() {
        return (ringBuffer.remainingCapacity() < RINGBUFFER_SIZE * 0.1);
    }

    public static void main(String[] args) {

        init();//初始化
        handleEventsWith(new EventHandler[]{new DeliveryReportEventHandler(1), new DeliveryReportEventHandler(2)});//添加2个消费者
        start();//启动disruptor

        //生产10个商品
        for (int i = 0; i < 10; i++) {
            ValueEvent valueEvent = new ValueEvent();
            valueEvent.setValue(UUID.randomUUID().toString());
            addEVent(valueEvent);
        }
        //停止
        shutdown();
    }

}


执行结果内容


DeliveryReportEventHandler{id=2} event: de9b74ab-0911-44c1-8166-86ac184b922e sequence: 0 endOfBatch: false
DeliveryReportEventHandler{id=1} event: de9b74ab-0911-44c1-8166-86ac184b922e sequence: 0 endOfBatch: false
DeliveryReportEventHandler{id=1} event: a17f5b7d-35bf-4181-86ee-45f54dfea6e1 sequence: 1 endOfBatch: true
DeliveryReportEventHandler{id=2} event: a17f5b7d-35bf-4181-86ee-45f54dfea6e1 sequence: 1 endOfBatch: false
DeliveryReportEventHandler{id=1} event: 18d2fbb3-2dc5-42cd-977c-1eee9e00b9f3 sequence: 2 endOfBatch: false
DeliveryReportEventHandler{id=2} event: 18d2fbb3-2dc5-42cd-977c-1eee9e00b9f3 sequence: 2 endOfBatch: true
DeliveryReportEventHandler{id=1} event: 048ac24f-e196-4dd8-b88e-6a8b87b49cac sequence: 3 endOfBatch: false
DeliveryReportEventHandler{id=2} event: 048ac24f-e196-4dd8-b88e-6a8b87b49cac sequence: 3 endOfBatch: false
DeliveryReportEventHandler{id=1} event: 1c850ece-787f-49ab-91b0-37ff812a8d94 sequence: 4 endOfBatch: false
DeliveryReportEventHandler{id=2} event: 1c850ece-787f-49ab-91b0-37ff812a8d94 sequence: 4 endOfBatch: false
DeliveryReportEventHandler{id=1} event: a6dec83c-f582-42f2-8dd3-ed311af4e41d sequence: 5 endOfBatch: false
DeliveryReportEventHandler{id=2} event: a6dec83c-f582-42f2-8dd3-ed311af4e41d sequence: 5 endOfBatch: false
DeliveryReportEventHandler{id=1} event: ea0cdd6e-7326-473e-8b7a-e16b8e79c51d sequence: 6 endOfBatch: false
DeliveryReportEventHandler{id=2} event: ea0cdd6e-7326-473e-8b7a-e16b8e79c51d sequence: 6 endOfBatch: false
DeliveryReportEventHandler{id=1} event: b8ef68fe-67b7-4575-b7ed-921e265d3487 sequence: 7 endOfBatch: false
DeliveryReportEventHandler{id=2} event: b8ef68fe-67b7-4575-b7ed-921e265d3487 sequence: 7 endOfBatch: false
DeliveryReportEventHandler{id=1} event: 559c0e40-8e00-485f-92f4-b2147ce88d2f sequence: 8 endOfBatch: false
DeliveryReportEventHandler{id=2} event: 559c0e40-8e00-485f-92f4-b2147ce88d2f sequence: 8 endOfBatch: false
DeliveryReportEventHandler{id=1} event: 4a87ecf4-f05f-4e2e-863c-6430ac9729cc sequence: 9 endOfBatch: true
DeliveryReportEventHandler{id=2} event: 4a87ecf4-f05f-4e2e-863c-6430ac9729cc sequence: 9 endOfBatch: true


Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐