003-disruptor样例
一、概述
依据Disruptor的工作流依次执行的特性,实现各种样例
具体而言,它可以解决如下方面:
- 并行计算实现;
- 串行依次执行;
- 菱形方式执行;
- 链式并行计算。
并且基于以上情况,每种类型的消费者都可以池化,默认初始化多个同一类型的消费者实例,并行处理,提高系统吞吐量。
1.1、样例
一个生产者生产一个Long类型的数值,消费者对该数值进行处理的操作。本样例对以上各种情况的实现只是disruptor注册消费者的方式不同,因此,我们先把事件类、事件工厂类、消费者类、事件转换类和主函数贴出来。
事件类
public class LongEvent { private Long number; public Long getNumber() { return number; } public void setNumber(Long number) { this.number = number; } }
事件工厂类
public class LongEventFactory implements EventFactory<LongEvent> { @Override public LongEvent newInstance() { return new LongEvent(); } }
事件转换类
public class LongEventTranslator implements EventTranslatorOneArg<LongEvent, Long> { @Override public void translateTo(LongEvent event, long sequence, Long arg0) { event.setNumber(arg0); } }
C1-1消费者类
该消费者执行将数值+10的操作。可以看到该消费者同时实现了EventHandler和WorkHandler两个接口。如果不需要池化,只需要实现EventHandler类即可。如果需要池化,只需要实现WorkHandler类即可。本例为了能够同时讲解池化和非池化的实现,因此同时实现了两个类。
/** * 值+10 */ public class C11EventHandler implements EventHandler<LongEvent>, WorkHandler<LongEvent> { @Override public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception { long number = event.getNumber(); number += 10; System.out.println(System.currentTimeMillis() + ":threadid:"+Thread.currentThread().getId()+": c1-1 consumer finished.number=" + number); } @Override public void onEvent(LongEvent event) throws Exception { long number = event.getNumber(); number += 10; System.out.println(System.currentTimeMillis() + ":threadid:"+Thread.currentThread().getId()+ ": c1-1 consumer finished.number=" + number); } }
C1-2消费者类
该消费者类执行将数值乘以10的操作。
/** * 值乘以10 */ public class C12EventHandler implements EventHandler<LongEvent>, WorkHandler<LongEvent> { @Override public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception { long number = event.getNumber(); number *= 10; System.out.println(System.currentTimeMillis()+ ":threadid:"+Thread.currentThread().getId()+": c1-2 consumer finished.number=" + number); } @Override public void onEvent(LongEvent event) throws Exception { long number = event.getNumber(); number *= 10; System.out.println(System.currentTimeMillis()+ ":threadid:"+Thread.currentThread().getId()+": c1-2 consumer finished.number=" + number); } }
c2-1消费者类
该消费者类负责将数值+20.
/** * 值+20 */ public class C21EventHandler implements EventHandler<LongEvent>, WorkHandler<LongEvent> { @Override public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception { long number = event.getNumber(); number += 20; System.out.println(System.currentTimeMillis()+ ":threadid:"+Thread.currentThread().getId()+": c2-1 consumer finished.number=" + number); } @Override public void onEvent(LongEvent event) throws Exception { long number = event.getNumber(); number += 20; System.out.println(System.currentTimeMillis()+ ":threadid:"+Thread.currentThread().getId()+": c2-1 consumer finished.number=" + number); } }
C2-2消费者类
该消费者类负责将数值*20
/** * 值*20 */ public class C22EventHandler implements EventHandler<LongEvent>, WorkHandler<LongEvent> { @Override public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception { long number = event.getNumber(); number *= 20; System.out.println(System.currentTimeMillis() + ":threadid:"+Thread.currentThread().getId()+ ": c2-2 consumer finished.number=" + number); } @Override public void onEvent(LongEvent event) throws Exception { long number = event.getNumber(); number *= 20; System.out.println(System.currentTimeMillis() + ":threadid:"+Thread.currentThread().getId()+": c2-2 consumer finished.number=" + number); } }
主函数
public static void main(String[] args) { int bufferSize = 1024 * 1024;//环形队列长度,必须是2的N次方 EventFactory<LongEvent> eventFactory = new LongEventFactory(); /** * 定义Disruptor,基于单生产者,阻塞策略 */ Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, bufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new BlockingWaitStrategy()); ///////////////////////////////////////////////////////////////////// parallel(disruptor);//这里是调用各种不同方法的地方. ///////////////////////////////////////////////////////////////////// RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); /** * 输入10 */ ringBuffer.publishEvent(new LongEventTranslator(), 10L); ringBuffer.publishEvent(new LongEventTranslator(), 100L); }
1.1、并行计算实现
并行计算就是消费者之间互相不依赖,并行执行,执行开始时间是一样的。
/** * 并行计算实现,c1,c2互相不依赖 * <br/> * p --> c11 * --> c21 */ public static void parallel(Disruptor<LongEvent> disruptor) { disruptor.handleEventsWith(new C11EventHandler(), new C21EventHandler()); disruptor.start(); }
输出
1593409707700:threadid:13: c2-1 consumer finished.number=30 1593409707700:threadid:12: c1-1 consumer finished.number=20 1593409707701:threadid:13: c2-1 consumer finished.number=120 1593409707701:threadid:12: c1-1 consumer finished.number=110
1.2、串行计算,一次执行
/** * 串行依次执行 * <br/> * p --> c11 --> c21 * @param disruptor */ public static void serial(Disruptor<LongEvent> disruptor){ disruptor.handleEventsWith(new C11EventHandler()).then(new C21EventHandler()); disruptor.start(); }
输出
1593409855297:threadid:12: c1-1 consumer finished.number=20 1593409855297:threadid:12: c1-1 consumer finished.number=110 1593409855305:threadid:13: c2-1 consumer finished.number=30 1593409855305:threadid:13: c2-1 consumer finished.number=120
1.3、菱形方式执行
/** * 菱形方式执行 * <br/> * --> c11 * p --> c21 * --> c12 * @param disruptor */ public static void diamond(Disruptor<LongEvent> disruptor){ disruptor.handleEventsWith(new C11EventHandler(),new C12EventHandler()).then(new C21EventHandler()); disruptor.start(); }
输出
1593410111008:threadid:13: c1-2 consumer finished.number=100 1593410111008:threadid:12: c1-1 consumer finished.number=20 1593410111008:threadid:12: c1-1 consumer finished.number=110 1593410111008:threadid:13: c1-2 consumer finished.number=1000 1593410111016:threadid:14: c2-1 consumer finished.number=30 1593410111016:threadid:14: c2-1 consumer finished.number=120
1.4、链式并行计算
/** * 链式并行计算 * <br/> * --> c11 --> c12 * p * --> c21 --> c22 * @param disruptor */ public static void chain(Disruptor<LongEvent> disruptor){ disruptor.handleEventsWith(new C11EventHandler()).then(new C12EventHandler()); disruptor.handleEventsWith(new C21EventHandler()).then(new C22EventHandler()); disruptor.start(); }
输出:只是10 时
1593410218656:threadid:12: c1-1 consumer finished.number=20 1593410218656:threadid:14: c2-1 consumer finished.number=30 1593410218665:threadid:15: c2-2 consumer finished.number=200 1593410218665:threadid:13: c1-2 consumer finished.number=100
上面的实例,每一种消费者都只有一个实例,如果想多个实例形成一个线程池并发处理多个任务怎么办?如果使用disruptor.handleEventWith(new C11EventHandler(),new C11EventHandler(),...)这种,会造成重复消费同一个数据,不是我们想要的。我们想要的是同一个类的实例消费不同的数据,怎么办?
- 首先,消费者类需要实现WorkHandler接口,而不是EventHandler接口。为了方便,我们同时实现了这两个接口。
- 其次,disruptor调用handleEventsWithWorkerPool方法,而不是handleEventsWith方法
- 最后,实例化多个事件消费类。
1.5、并行计算实现,c1,c2互相不依赖,同时C1,C2分别有2个实例
/** * 并行计算实现,c1,c2互相不依赖,同时C1,C2分别有2个实例 * <br/> * p --> c11 * --> c21 */ public static void parallelWithPool(Disruptor<LongEvent> disruptor){ disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler()); disruptor.handleEventsWithWorkerPool(new C21EventHandler(),new C21EventHandler()); disruptor.start(); } /**
输出【消息至10,100,200】
1593410807336:threadid:14: c2-1 consumer finished.number=120 1593410807336:threadid:15: c2-1 consumer finished.number=30 1593410807336:threadid:13: c1-1 consumer finished.number=110 1593410807336:threadid:12: c1-1 consumer finished.number=20 1593410807336:threadid:13: c1-1 consumer finished.number=210 1593410807336:threadid:14: c2-1 consumer finished.number=220
1.6、串行依次执行,同时C11,C21分别有2个实例
/** * 串行依次执行,同时C11,C21分别有2个实例 * <br/> * p --> c11 --> c21 * @param disruptor */ public static void serialWithPool(Disruptor<LongEvent> disruptor){ disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler()).then(new C21EventHandler(),new C21EventHandler()); disruptor.start(); }
输出:【消息值10】
1593411112619:threadid:12: c1-1 consumer finished.number=20 1593411112624:threadid:15: c2-1 consumer finished.number=30 1593411112624:threadid:14: c2-1 consumer finished.number=30
相关文章
- java tess4j 示例_java 使用tess4j实现OCR的最简单样例[通俗易懂]
- Winform布局美化样例
- XLSTransformer生成excel文件简单演示样例「建议收藏」
- adminLTE模态框弹出页面样例[通俗易懂]
- java分页查询(oracle)dao样例
- spring RestTemplate调用OAuth2客户端授权接口样例
- sluaunreal插件使用样例
- MySQL 常用时间范围查询SQL样例
- Java线程调度ScheduledThreadPoolExecutor简单使用样例详解编程语言
- Mongodb driver for java 使用样例详解大数据
- Oracle数据库:实用样例指南(oracle数据库样例)
- 实践Oracle:操作样例数据库(oracle样例数据库)
- java 获取新浪天气样例详解编程语言
- Linux操作系统样例实践(linuxsample)
- Prometheus 常用 PromQL 语句样例
- Ubuntu Touch 的邮件客户端样例的设计看起来很炫哦
- Redis配置文件示例及使用说明(redis配置文件样例)
- MySQL实例分析:学习如何管理大型数据库(mysql样例)
- SQL Server样例库:一个强大的学习工具(sqlserver样例库)
- div+css排版的样例
- 基于jquery的滑动样例代码
- 用python+hadoopstreaming分布式编程(一)--原理介绍,样例程序与本地调试