zl程序教程

您现在的位置是:首页 >  前端

当前栏目

SpringCloudRPC调用核心原理:RxJava响应式编程框架,聚合操作符

响应rxjava框架编程原理 调用 核心 聚合
2023-06-13 09:14:10 时间

聚合操作符

本节介绍RxJava的两个聚合型操作符:count操作符和reduce操作符。

count操作符

count操作符用来对源Observable流的数据项进行计数,最后将总数弹射出来;如果源流弹射错误,就会将错误直接报出来;在源Observable流没有终止前,count操作符是不会弹射统计数据的。使用count操作符对数据流序列进行计数,具体的执行流程如图4-9所示。

图4-9 使用count操作符对数据流序列进行计数

下面是一个使用count操作符的简单例子,代码如下:

package com.crazymaker.demo.rxJava.basic;//省略import@Slf4jpublic class AggregateDemo{ /** *演示count计数操作符 */ @Test public void countDemo() { String[] items = {"one", "two", "three", "four"}; Integer count = Observable .from(items) .count() .toBlocking().single(); log.info("计数的结果为 {}",count); }}

运行以上代码,输出的结果节选如下:

[main] INFO c.c.d.r.basic.AggregateDemo - 计数的结果为 4可以看出,count操作符将一个Observable源流转换成一个弹射单个值的Observable输出流,输出流的唯一数据项的值为原始Observable流所弹射的数据项数量。

在上面的代码中,为了获取count输出流中的数据项,使用了toBlocking()和single()两个操作符。其中,Observable.toBlocking()操作返回了一个BlockingObservable阻塞型实例,该类型不是一种新的数据流,仅仅是对源Observable的包装,只是该类型会阻塞当前线程,一直等待直到内部的源Observable弹射了自己想要的数据。BlockingObservable.single()方法表示阻塞当前线程,直到从封装的源Observable获取到唯一的弹射数据元素项,如果Observable源流弹射出的数据元素不止一个,single()方法就会抛出异常。

reduce操作符

Reduce(归约)操作符对一个Observable流序列的每一项应用一个归约函数,最后将流的最终归约计算结果弹射出去。除了第一项之外,reduce操作符会将上一个数据项应用归约函数的结果作为下一个数据项在应用归约函数时的输入。所以,和scan操作符一样,reduce操作符也有点类似递归操作。

假定归约函数为一个简单的累加函数,然后使用reduce操作符对1~5的数据流序列进行归约,其具体的归约流程如图4-10所示。

图4-10 reduce操作符对1~5的数据流序列的归约流程

使用reduce操作符实现对1~5的数据流序列的归约,参考如下的实现代码:

package com.crazymaker.demo.rxJava.basic;//省略import@Slf4jpublic class AggregateDemo{ /**演示操作符 *演示reduce操作符 */ @Test public void reduceDemo() { /** *定义一个accumulator归约函数 */ Func2<Integer, Integer, Integer> accumulator = new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer input1, Integer input2) { log.info(" {} + {} = {} ", input1, input2, input1 + input2); return input1 + input2; } }; /** *使用reduce进行流归约 */ Observable.range(, ) .reduce(accumulator) .subscribe(new Action1<Integer>() { @Override public void call(Integer sum) { log.info(" 归约的结果: {} ", sum); } }); }}

运行以上代码,输出的结果节选如下:

[main] INFO c.c.d.r.basic.AggregateDemo -  +  = [main] INFO c.c.d.r.basic.AggregateDemo -  +  = [main] INFO c.c.d.r.basic.AggregateDemo -  +  = [main] INFO c.c.d.r.basic.AggregateDemo -  +  = [main] INFO c.c.d.r.basic.AggregateDemo - 归约的结果: 

以上实例代码中,reduce操作符对原始Observable流所弹射的第一项数据1应用归约函数,得到中间结果1;然后将第一个中间结果1连同原始流的第二项数据2一起填充给accumulator归约函数,得到中间结果3。reduce持续对原始流进行迭代,一直到原始流的最后一个数据项5,reduce将5连同中间结果10一起填充给accumulator归约函数,得到最终结果15。最后,reduce会将最终结果15作为输出流的数据项弹射出去。reduce操作符与前面介绍的scan操作符很类似,只是scan会弹出每次计算的中间结果,而reduce只会弹出最后的结果。

本文给大家讲解的内容是SpringCloudRPC远程调用核心原理:RxJava响应式编程框架,聚合操作符

  1. 下篇文章给大家讲解的是SpringCloudRPC远程调用核心原理:RxJava响应式编程框架,其他操作符;
  2. 觉得文章不错的朋友可以转发此文关注小编;
  3. 感谢大家的支持!

本文就是愿天堂没有BUG给大家分享的内容,大家有收获的话可以分享下,想学习更多的话可以到微信公众号里找我,我等你哦。