如何从JDK8 Stream转换为反应式框架流?
最近在做一个项目,获取JDK8 Stream对象后,想要批量消费,不想自己写个集合来做批量处理。而反应式编程实现比如rxjava或者reactor是有丰富的流操作符,所以调研了下如何把JDK8 Stream转换为反应式流。
二、批量消费
有时候场景需要我们批量消费以便提高执行效率,比如对应同一个表的插入操作,批量插入的效率比单条逐个插入效率要好很多。那么对应给定的一个数据源,如何聚合数据为批量那?当数据源是一个内存list时候,最简单方法如下:
public static void main(String[] ar) { //1.创建list List Integer personList = new ArrayList Integer for (int i = 0; i 100; ++i) { personList.add(i); //2.切分处理 List List Integer list = Lists.partition(personList, 20); list.stream().forEach(tempList- System.out.println(JSON.toJSONString(tempList)));
如上代码1创建了一个list列表,代码2,使用Google guava包里面的Lists.partition函数把list切分为一个个最多包含20个元素的list列表,并打印输出。
但是当数据源是个流那,比如文件流或者数据库连接流等等(一般为Stream对象),这时候我们不可能把流下的所有数据全部加载到内存,然后在使用上面的Lists.partition方法,因为这样可能会造成OOM。
我们想要的是从这些流中每次读取limit条记录,然后批量处理这limit条记录,这样内存中每次只会存在limit条记录。这时由于JDK Stream不支持Buffer操作,我们需要自己实现,实现代码大概如下:
//1.缓存列表 List Integer mergeList = new ArrayList (); int limit = 20; //2.循环获取元素并缓存 stream.forEach(e - { if (mergeList.size() = limit) { System.out.println(JSON.toJSONString(mergeList)); mergeList.clear(); mergeList.add(e); //3.退出后,补漏处理 if (mergeList.size() 0){ System.out.println(JSON.toJSONString(mergeList));
如上代码在Stream中迭代元素时,我们把元素缓存到mergeList列表,每当mergeList有了20个元素,则处理一次。最后等流结束后,如果mergeList还有元素则需要补漏处理下。
如果不想实现上面繁琐代码,我们可以考虑吧JDK 8Stream切换到反应式实现框架比如Reactor或者Rxjava,因为后者有丰富的流操作符。其中Reactor的一个实现是:
//1.为了使用buffer功能,转换为Reactor的流对象Flux Flux flux = Flux.fromStream(stream); //2..聚合消费 flux.buffer(20).subscribe(integers - { System.out.println(JSON.toJSONString(integers));
如上代码,我们使用Reactor框架的Flux.fromStream方法把JDKStream转换为Flux流对象,然后调用其buffer方法设置缓存20个元素消费一次,然后调用subscribe订阅缓存流,并打印。
可知代码简洁很多,并且符合声明式编程。
在Java 8中引入了Stream,它旨在有效地处理数据流(包括原始类型)。
它是基于拉的,并且只能使用一次,但是缺少与时间相关的操作(比如buffer、window操作),虽然可以执行并行计算(基于ForkJoinPool.commonPool()),但无法指定用业务自己的线程池。另外它也还没有设计用于处理延迟操作(比如rxjava的defer()操作)。其所不支持的特性就是Reactor或RxJava等Reactive API的用武之地。
原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 如何从JDK8 Stream转换为反应式框架流?
相关文章
- 【数据结构与算法】深入浅出递归和迭代的通用转换思想[通俗易懂]
- Java基础入门笔记05——面向对象,创建对象的内存分析,继承,封装,多态,object类,this&&super,方法重写,引用类型的强制转换,instanceof,抽象类,内部类,接口,异常。[通
- pytorch: tensor与numpy之间的转换[通俗易懂]
- 完美解决文件格式转换问题
- MySQL实现秒数快速转换为时分秒(mysql秒转时分秒)
- 的转换从MySQL迁移数据到ElasticSearch(mysql到es)
- MySQL:从字符到二进制的转换(mysql字符转二进制)
- MySQL转换至MSSQL:一次成功的迁移过程(mysql转成mssql)
- Oracle CHR13在跨平台字符集转换中的重要作用(oracle chr13)
- c#实现16进制和字符串之间转换的代码
- php中将网址转换为超链接的函数
- java中字符串与日期的转换实例
- phpcookie名使用点号(句号)会被转换