zl程序教程

您现在的位置是:首页 >  其它

当前栏目

003-disruptor样例

样例 003 Disruptor
2023-09-14 09:08:45 时间

一、概述

依据Disruptor的工作流依次执行的特性,实现各种样例

具体而言,它可以解决如下方面:
- 并行计算实现;
- 串行依次执行;
- 菱形方式执行;
- 链式并行计算。
并且基于以上情况,每种类型的消费者都可以池化,默认初始化多个同一类型的消费者实例,并行处理,提高系统吞吐量。

1.1、样例

一个生产者生产一个Long类型的数值,消费者对该数值进行处理的操作。本样例对以上各种情况的实现只是disruptor注册消费者的方式不同,因此,我们先把事件类、事件工厂类、消费者类、事件转换类和主函数贴出来。

事件类

public class LongEvent {
    private Long number;

    public Long getNumber() {
        return number;
    }

    public void setNumber(Long number) {
        this.number = number;
    }
}

事件工厂类

public class LongEventFactory implements EventFactory<LongEvent> {
    @Override
    public LongEvent newInstance() {
        return new LongEvent();
    }
}

事件转换类

public class LongEventTranslator implements EventTranslatorOneArg<LongEvent, Long> {
    @Override
    public void translateTo(LongEvent event, long sequence, Long arg0) {
        event.setNumber(arg0);
    }
}

C1-1消费者类

该消费者执行将数值+10的操作。可以看到该消费者同时实现了EventHandler和WorkHandler两个接口。如果不需要池化,只需要实现EventHandler类即可。如果需要池化,只需要实现WorkHandler类即可。本例为了能够同时讲解池化和非池化的实现,因此同时实现了两个类。

/**
 * 值+10
 */
public class C11EventHandler implements EventHandler<LongEvent>, WorkHandler<LongEvent> {
    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
        long number = event.getNumber();
        number += 10;
        System.out.println(System.currentTimeMillis() + ":threadid:"+Thread.currentThread().getId()+": c1-1 consumer finished.number=" + number);
    }

    @Override
    public void onEvent(LongEvent event) throws Exception {
        long number = event.getNumber();
        number += 10;
        System.out.println(System.currentTimeMillis() + ":threadid:"+Thread.currentThread().getId()+ ": c1-1 consumer finished.number=" + number);
    }
}

C1-2消费者类

该消费者类执行将数值乘以10的操作。

/**
 * 值乘以10
 */
public class C12EventHandler implements EventHandler<LongEvent>, WorkHandler<LongEvent> {
    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
        long number = event.getNumber();
        number *= 10;
        System.out.println(System.currentTimeMillis()+ ":threadid:"+Thread.currentThread().getId()+": c1-2 consumer finished.number=" + number);
    }

    @Override
    public void onEvent(LongEvent event) throws Exception {
        long number = event.getNumber();
        number *= 10;
        System.out.println(System.currentTimeMillis()+ ":threadid:"+Thread.currentThread().getId()+": c1-2 consumer finished.number=" + number);
    }
}

c2-1消费者类

该消费者类负责将数值+20.

/**
 * 值+20
 */
public class C21EventHandler implements EventHandler<LongEvent>, WorkHandler<LongEvent> {
    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
        long number = event.getNumber();
        number += 20;
        System.out.println(System.currentTimeMillis()+ ":threadid:"+Thread.currentThread().getId()+": c2-1 consumer finished.number=" + number);
    }

    @Override
    public void onEvent(LongEvent event) throws Exception {
        long number = event.getNumber();
        number += 20;
        System.out.println(System.currentTimeMillis()+ ":threadid:"+Thread.currentThread().getId()+": c2-1 consumer finished.number=" + number);
    }
}

C2-2消费者类

该消费者类负责将数值*20

/**
 * 值*20
 */
public class C22EventHandler implements EventHandler<LongEvent>, WorkHandler<LongEvent> {
    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
        long number = event.getNumber();
        number *= 20;
        System.out.println(System.currentTimeMillis() + ":threadid:"+Thread.currentThread().getId()+ ": c2-2 consumer finished.number=" + number);
    }

    @Override
    public void onEvent(LongEvent event) throws Exception {
        long number = event.getNumber();
        number *= 20;
        System.out.println(System.currentTimeMillis() +  ":threadid:"+Thread.currentThread().getId()+": c2-2 consumer finished.number=" + number);
    }
}

主函数

    public static void main(String[] args) {
        int bufferSize = 1024 * 1024;//环形队列长度,必须是2的N次方
        EventFactory<LongEvent> eventFactory = new LongEventFactory();
        /**
         * 定义Disruptor,基于单生产者,阻塞策略
         */
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, bufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new BlockingWaitStrategy());
        /////////////////////////////////////////////////////////////////////
        parallel(disruptor);//这里是调用各种不同方法的地方.
        /////////////////////////////////////////////////////////////////////
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        /**
         * 输入10
         */
        ringBuffer.publishEvent(new LongEventTranslator(), 10L);
        ringBuffer.publishEvent(new LongEventTranslator(), 100L);
    }

 

1.1、并行计算实现

   

  并行计算就是消费者之间互相不依赖,并行执行,执行开始时间是一样的。

    /**
     * 并行计算实现,c1,c2互相不依赖
     * <br/>
     * p --> c11
     * --> c21
     */
    public static void parallel(Disruptor<LongEvent> disruptor) {
        disruptor.handleEventsWith(new C11EventHandler(), new C21EventHandler());
        disruptor.start();
    }

  输出

1593409707700:threadid:13: c2-1 consumer finished.number=30
1593409707700:threadid:12: c1-1 consumer finished.number=20
1593409707701:threadid:13: c2-1 consumer finished.number=120
1593409707701:threadid:12: c1-1 consumer finished.number=110

1.2、串行计算,一次执行

  

    /**
     * 串行依次执行
     * <br/>
     * p --> c11 --> c21
     * @param disruptor
     */
    public static void serial(Disruptor<LongEvent> disruptor){
        disruptor.handleEventsWith(new C11EventHandler()).then(new C21EventHandler());
        disruptor.start();
    }

输出

1593409855297:threadid:12: c1-1 consumer finished.number=20
1593409855297:threadid:12: c1-1 consumer finished.number=110
1593409855305:threadid:13: c2-1 consumer finished.number=30
1593409855305:threadid:13: c2-1 consumer finished.number=120

1.3、菱形方式执行

  

    /**
     * 菱形方式执行
     * <br/>
     *   --> c11
     * p          --> c21
     *   --> c12
     * @param disruptor
     */
    public static void diamond(Disruptor<LongEvent> disruptor){
        disruptor.handleEventsWith(new C11EventHandler(),new C12EventHandler()).then(new C21EventHandler());
        disruptor.start();
    }

 输出

1593410111008:threadid:13: c1-2 consumer finished.number=100
1593410111008:threadid:12: c1-1 consumer finished.number=20
1593410111008:threadid:12: c1-1 consumer finished.number=110
1593410111008:threadid:13: c1-2 consumer finished.number=1000
1593410111016:threadid:14: c2-1 consumer finished.number=30
1593410111016:threadid:14: c2-1 consumer finished.number=120

1.4、链式并行计算

    /**
     * 链式并行计算
     * <br/>
     *   --> c11 --> c12
     * p
     *   --> c21 --> c22
     * @param disruptor
     */
    public static void chain(Disruptor<LongEvent> disruptor){
        disruptor.handleEventsWith(new C11EventHandler()).then(new C12EventHandler());
        disruptor.handleEventsWith(new C21EventHandler()).then(new C22EventHandler());
        disruptor.start();
    }

输出:只是10 时

1593410218656:threadid:12: c1-1 consumer finished.number=20
1593410218656:threadid:14: c2-1 consumer finished.number=30
1593410218665:threadid:15: c2-2 consumer finished.number=200
1593410218665:threadid:13: c1-2 consumer finished.number=100

上面的实例,每一种消费者都只有一个实例,如果想多个实例形成一个线程池并发处理多个任务怎么办?如果使用disruptor.handleEventWith(new C11EventHandler(),new C11EventHandler(),...)这种,会造成重复消费同一个数据,不是我们想要的。我们想要的是同一个类的实例消费不同的数据,怎么办?
- 首先,消费者类需要实现WorkHandler接口,而不是EventHandler接口。为了方便,我们同时实现了这两个接口。
- 其次,disruptor调用handleEventsWithWorkerPool方法,而不是handleEventsWith方法
- 最后,实例化多个事件消费类。

1.5、并行计算实现,c1,c2互相不依赖,同时C1,C2分别有2个实例

    /**
     * 并行计算实现,c1,c2互相不依赖,同时C1,C2分别有2个实例
     * <br/>
     * p --> c11
     *   --> c21
     */
    public static void parallelWithPool(Disruptor<LongEvent> disruptor){
        disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler());
        disruptor.handleEventsWithWorkerPool(new C21EventHandler(),new C21EventHandler());
        disruptor.start();
    }
    /**

输出【消息至10,100,200】

1593410807336:threadid:14: c2-1 consumer finished.number=120
1593410807336:threadid:15: c2-1 consumer finished.number=30
1593410807336:threadid:13: c1-1 consumer finished.number=110
1593410807336:threadid:12: c1-1 consumer finished.number=20
1593410807336:threadid:13: c1-1 consumer finished.number=210
1593410807336:threadid:14: c2-1 consumer finished.number=220

1.6、串行依次执行,同时C11,C21分别有2个实例

    /**
     * 串行依次执行,同时C11,C21分别有2个实例
     * <br/>
     * p --> c11 --> c21
     * @param disruptor
     */
    public static void serialWithPool(Disruptor<LongEvent> disruptor){
        disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler()).then(new C21EventHandler(),new C21EventHandler());
        disruptor.start();
    }

输出:【消息值10】 

1593411112619:threadid:12: c1-1 consumer finished.number=20
1593411112624:threadid:15: c2-1 consumer finished.number=30
1593411112624:threadid:14: c2-1 consumer finished.number=30