zl程序教程

您现在的位置是:首页 >  移动开发

当前栏目

rxjava之操作符

rxjava 操作符
2023-09-14 08:59:52 时间

1.1.转换类操作符(map  flatMap  concatMap  flatMapIterable   switchMap   scan   groupBy...);

1.2过滤类操作符(fileter take takeLast takeUntil distinct distinctUntilChanged skip skipLast ...);

1.3 RxJava系列之组合操作符()

    merge, zip, join组合符有什么区别?

RxJava中的操作符(Operators),RxJava中的操作符主要三类:

转换类操作符(map  flatMap  concatMap  flatMapIterable   switchMap   scan   groupBy...);

组合类操作符(merge   zip   join   combineLatest    and/when/then   switch   startSwitch...)。

过滤类操作符(fileter take takeLast takeUntil distinct distinctUntilChanged skip skipLast ...);

---------------------

三. 转换操作符和组合操作符的介绍?

2.1  FlatMap和Map以及concatMap区别?
map和flatMap都可以对RxJava传入的数据进行变换。
map 对数据进行变换后,可以返回任意值。map对数据的变换是1对1进行的;
flatMap对数据变换后,返回ObservableSource对象。可以对数据进行一对多,多对多的变换。flatMap并不保证数据有序。
concatMap与flatMap使用基本一致,它可以保证数据有序。

1. map的使用:
Observable.just("学生1", "学生2", "学生3")
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Throwable {
return s;
}
})
.subscribe(t -> {
Log.d("=====", t);
});
输入三条数据,最后subscribe结果也是三条。打印结果为:
=====: 学生1
=====: 学生2
=====: 学生3

2.flatMap的使用:这里需要使用到一个简单的工具类。
public class Student {
public String name;
public int no;
}
创建对应数据的方法:
private List<Student> getData() {
List<Student> data = new ArrayList<>();
Student stu = null;
for (int i = 0; i < 5; i ++) {
stu = new Student();
stu.name = "学生" + i;
stu.no = i;
data.add(stu);
}
return data;
}
List<Student> mData = getData();

1对1的情况
Observable.just(mData)
.flatMap(new Function<List<Student>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<Student> students){
return Observable.create(new ObservableOnSubscribe<List<Student>>() {
@Override
public void subscribe(@NonNull ObservableEmitter<List<Student>> emitter) {
   emitter.onNext(students);
  }
});
}
})
.subscribe(t -> {
Log.d("=====", t.toString());
});

从打印日志可以看到,这里输入一个List对象,subscribe中也得到一个List的对象。下面我们来看一对多的情况
=====:
[com.example.demorxjava3.Student@35b2f4b, com.example.demorxjava3.Student@53e2d28, com.example.demorxjava3.Student@22b9a41, com.example.demorxjava3.Student@4d6abe6, com.example.demorxjava3.Student@1ab7527]

1对多:
Observable.just(mData)
.flatMap(new Function<List<Student>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<Student> students) throws Throwable {
return Observable.fromIterable(students);
}
})
.subscribe(t -> {
Log.d("=====", t.toString());
});
从打印结果可以看到,这里分别打印了五个Student对象
=====: com.example.demorxjava3.Student@35b2f4b
=====: com.example.demorxjava3.Student@53e2d28
=====: com.example.demorxjava3.Student@22b9a41
=====: com.example.demorxjava3.Student@4d6abe6
=====: com.example.demorxjava3.Student@1ab7527

多对多(减少对象)
Observable.just("学生1", "学生2", "学生3")
.flatMap(new Function<String, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(String s) throws Throwable {
if ("学生2".equals(s)) {
return Observable.empty();
}
return Observable.just(s);
}
})
.subscribe(t -> {
Log.d("=====", t.toString());
});
在这里我们输入了三条数据,在flatMap中,第二条数据返回的是空Observable对象,会被忽略,所以我们会看到最后数据只剩下两条。
=====: 学生1
=====: 学生3

多对多(增加对象)
Observable.just("学生1", "学生2", "学生3")
.flatMap(new Function<String, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(String s) throws Throwable {
if ("学生2".equals(s)) {
return Observable.just(s, "学生N");
}
return Observable.just(s);
}
})
.subscribe(t -> {
Log.d("=====", t.toString());
});
这里我们增加了一个学生N。
=====: 学生1
=====: 学生2
=====: 学生N
=====: 学生3

flatMap并不保证数据有序:
Observable.just("学生1", "学生2", "学生3", "学生4", "学生5")
.flatMap(new Function<String, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(String s) throws Throwable {
return Observable.just(s).delay(200, TimeUnit.MILLISECONDS);
}
})
.subscribe(t -> {
Log.d("=====", t.toString());
});
其中一次输出结果为:
=====: 学生2
=====: 学生1
=====: 学生3
=====: 学生4
=====: 学生5

3. concatMap:
  concatMap的使用与flatMap基本一致,它可以保证输出数据有序。

flatMap和concatMap,他们两个都是将上游发送的数据都封装成一个个的Observable,再有一个Observable进行发送。
flatMapIterable
latMapIterable() 和flatMap()功能在流程上大体一致,唯一不同是:
flatMap是转一个Observable转换成多个Observable,每一个Observable最后又返回一个Observable。
flatMapInterable是将一个Observable转换成多个Observable,但是每一个Observable最后返回得是Iterable。Iterable可以理解成返回一个list
代码如下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(2);
}
}).flatMapIterable(new Function<Integer, Iterable<String>>() {
@Override
public Iterable<String> apply(Integer integer) throws Exception {
ArrayList list=new ArrayList();
list.add(integer+"a");
list.add(integer+"b");
return list;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("accept",s);
}
});
测试结果
07-03 14:09:00.219 30310-30310/com.zhqy.myrxjava E/accept: 1a
07-03 14:09:00.219 30310-30310/com.zhqy.myrxjava E/accept: 1b
07-03 14:09:00.219 30310-30310/com.zhqy.myrxjava E/accept: 2a
07-03 14:09:00.219 30310-30310/com.zhqy.myrxjava E/accept: 2b
07-03 14:09:00.219 30310-30310/com.zhqy.myrxjava E/accept: 2a
07-03 14:09:00.219 30310-30310/com.zhqy.myrxjava E/accept: 2b

二. RxJava中的组合/合并操作符,并完成实例
作用:组合多个被观察者&合并需要发送的事件。
1、类型
常见的组合/合并操作符有:
1.组合多个被观察者
a.按发送顺序:concat()、concatArray()
b.按时间: merge()、mergeArray()
c.错误处理:concatDelayError()、mergeDelayError()
2.合并多个事件
a.按数量:Zip()
b.按时间:combineLatest()、combineLatestDelayError()
c.合并成1个事件发送:reduce()、collect()
3.发送事件前追加发送事件:startWith()、startWithArray()
4.统计发送事件数量:count

2、操作符及应用介绍
1. 组合多个被观察者
1) merge()/mergeArray
作用: 组合多个被观察者一起发送数据,合并后按时间线并行执行。
merge() 操作符将多个 Observable 无序合并,当合并的 Observable<T> 泛型类型不一致时,事件流中的对象类型只能使用 Object(java),Any(kotlin)。
二者区别:组合被观察者的数量,即merge()组合被观察者数量≤4个,而mergeArray()则可>4个;
区别concat()操作符:同样是组合多个被观察者一起发送数据,但concat()操作符合并后是按发送顺序串行执行。

具体使用:
//merge()组合多个被观察者(<4个)一起发送数据
// 注:合并后按照时间线并行执行
Observable.merge(Observable.intervalRange(1,2,1,1, TimeUnit.SECONDS),Observable.intervalRange(1,3,10,1,TimeUnit.SECONDS))
.subscribe(new Observer<Long>() {
@Override
 public void onSubscribe(Disposable d) {
 }
@Override
public void onNext(Long aLong) {
Log.e(tag,"接收到了事件"+aLong);
}
});
//mergeArray()组合多个被观察者(>4个)一起发送数据
// 注:合并后按照时间线并行执行。

abc 与 123 是按照时间先后顺序交错进行输出的,说明 merge() 后事件的发送是并发的无序的,先发送先处理
多于四个 observable 的合并: 两种方式
merge() 传入一个 Iterable<? extends ObservableSource<? extends T> 对象
mergeArray() 传入一个 Observable 数组。

操作符merge使用示例:Retrofit +Rxjava 操作符之多个请求合并为一个请求:
App首页一般都比较复杂,好多公司传递数据都是3-4个接口将数据传递回来,这是可能就需要使用的多个请求合并成一个请求,最后再去更UI 。在这里需要用Rxjava的操作符merge:
void getData(){
showLoading();
Observable<TimerBean> time = RetrofitClient.getHomeApi().getTime("getServerTime");
Observable<GoodsDetailBean> goodsDetail = RetrofitClient.getHomeApi().getGoodsDetail( products_id);
Observable.merge(time,goodsDetail)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Object>() {
@Override
public void onNext(Object obj) {
if (obj instanceof TimerBean){ // 获取到一个请求的数据
   TimerBean time = (TimerBean)obj;
} else if (obj instanceof GoodsDetailBean){ // 获取到第二个请求的数据
   GoodsDetailBean goodsDetailBean = (GoodsDetailBean)obj;
}
}
// 最后更新UI
});
}

2)concat()/concatArray():
作用:组合多个被观察者发送数据,合并后按发送顺序串行执行(串行执行)。Observable.concat(observable1, observable2)..
concat() 操作符将多个 Observable 按先后顺序进行合并,当合并的 Observable<T> 泛型类型不一致时,事件流中的对象类型只能使用 Object(java)。
二者区别:组合被观察者的数量,即concat()组合被观察者数量≤4个,而concatArray()可以大于4个。
多于四个 observable 的合并,两种方式:
concat() 传入一个 Iterable<? extends ObservableSource<? extends T> 对象
concatArray() 传入一个 Observable 数组。

使用场景: 如打印的结果,1 2 3 a b c。
concat 操作符是线性有序的,要等前一个 observable 发送完毕后才会处理下一个 observable。当我们需要多个接口的返回数据按顺序进行处理时可以使用 concat 操作符合并请求。

Concat和Merge的区别:
merge组合的多个Observable在不同线程并发发射时,收到的数据可能会交错,而concat则无论怎样都不会交错,都是按顺序接收。

3) concatDelayError()/mergeDelayError():
mergeDelayError(): 合并多个Observables,让没有错误的Observable都完成后再发射错误通知。2个请求其中一个请求错误时候,不影响另外一个请求。
虽然只有第一个数据流是错误的,但是整个请求却结束了,这可不是我们想要的,怎么办呢?RxJava提供mergeDelayError操作符,这个操作符的作用就是延迟错误。
concatDelayError:多个Observable合并,并按顺序发射数据, 如果发生异常,则不会立即中断发射数据,异常将延迟发射。
mergeDelayError:多个Observable合并,并行发射数据, 如果发生异常,则不会立即中断发射数据,异常将延迟发射。
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onError(new NullPointerException("exception"));
e.onComplete();
}
}).delay(2000, TimeUnit.MILLISECONDS));

list.add(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(2);
e.onError(new NullPointerException("exception"));
e.onComplete();
}
}));
Observable.concatDelayError(list)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d("aaa", "发生了异常");
}
});
/ / 输出:
1
2
发生了异常

扩展:
更新:
一个页面如果有两个接口,我们一般会有一个显示主ui的接口,比方说首页有一个列表接口,有一个banner接口,如果列表接口发生错误时,我们才把错误状态的页面显示出来,但是看上面的结果,这两个接口无论哪个发生错误都会调用onError,我们也无法判断到底是哪个接口发生的错误,这样该怎么处理呢?下面展示一下最新代码:
fun getData(page: Int) {
//首页列表数据控制错误状态
val f1 = createApi().getMainPageData(page.toString()).doOnError {
errorLiveData.postValue(it.toString())
}
val f2 = createApi().getBanner()
Flowable.mergeDelayError(f1, f2)
.applySchedulers()
.subscribe({
when (it) {
is MainPageBean -> mutableLiveData.postValue(it)
is BannerBean -> bannerLiveData.postValue(it)
}
}, { }).addToList()
}
上面我们用到了doOnError操作符,只有在主ui的数据发生错误时才会显示错误状态,这里用的kotlin编写,也可以看出语法确实简洁。
等一下,applySchedulers()这个方法什么鬼,我怎么找不到,这其实是我自己定义的一个扩展方法:
fun <T> Observable<T>.applySchedulers(): Observable<T> =subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
fun <T> Flowable<T>.applySchedulers(): Flowable<T> = subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
想一下如果用Java怎么写,是不是需要用到compose操作符。

2.合并多个事件:
该类型操作符主要是对多个被观察者中的事件进行合并处理。
1)Zip():
作用:合并多个被观察者发送的事件,生成一个新的事件序列,最终发送。(数据和数据之间是一对一的关系)
注意:事件组合方式=严格按照原先的事件序列进行对位合并。最终合并的事件数量=多个被观察者中数量最少的数量。
如:数据1a,b,c 数据21,2,3,4 -> 生成: a1 b2 c3;
zip专用于合并事件,该合并不是连接(flatMap,concatMap),而是两两配对。它按照严格的顺序应用这个函数。因此它只发射与发射数据项最少的那个Observable对象一样多的数据。
使用场景:
zip操作符可以应用在界面所需要的数据需要在两个或以上的接口的数据,当获取到两个接口的数据后再进行展示。

zip()
上面的两种合并都是单个事件 item 订阅监听,如果想合并的事件item都接收到数据时处理这两个事件数据就需要使用 zip() 操作符。
Observable.zip(observable1, observable2, object : BiFunction<Int, String, String> {
override fun apply(t1: Int, t2: String): String {
return "t1=$t1 t2=$t2"
}
}).subscribe(object : AppCallBack<String>() {
override fun success(data: String?) {
println("zip----->$data")
}
})
得到的输出结果为:
t1=a, t2=1;
t1=b, t2=2;

使用 zip 合并时,会等待每一次的合并项都发送完毕后再发送下一轮的事件。当我们需要从两个数据源拿数据,但是需要统一合并显示时可以使用 zip 操作符对事件流进行合并。
多个 observable 的合并:
zip() 的多事件合并就有点厉害了,支持九个Observable 按数据类型合并,除了两个观察者对象合并时zipper是 BiFunction 其他的为 FunctionX ,X 为合并个数。如果多于九个观察者对象合并,与上面两种合并一样可以使用 zipArray() 进行合并,但是合并后的观察结果是一个 Object 数组对象,需要自己判断数据类型

2)合并操作符还有startWith()/startWithArray():
作用:在一个被观察者发送事件前,追加发送一些数据/一个新的被观察者。
startWith: 追加单个数据
startWithArray:追加多个数据
Observable.just("a","b","c")
.startWith("d")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: "+s);
}
});
打印的结果:d,a,b,c

关于数据合并的操作符就先介绍Merge,Concat,Zip,StartWith这四个吧,其实还有很多:
1. and() , then() , when():
通过模式(And条件)和计划(Then次序)组合两个或多个Observable发射的数据集
2. combineLatest():
当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果
3. join() , groupJoin():
无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射
4. switch():
将一个发射Observable序列的Observable转换为这样一个Observable:它逐个发射那些Observable最近发射的数据
5 switchOnNext():
将一个发射Observables的Observable转换成另一个Observable,后者发射这些Observables最近发射的数据。
除了以上的创建 Observable 对象级别的合并操作符,还有一些事件流中的操作符,譬如concatMap ,在事件流前面添加其他事件的 startWith() 等。
combineLatest()
作用:当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables的最新(最后)一个数据与另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据。
与Zip()的区别:
zip()= 按个数合并,即1对1合并; CombineLatest()=按时间合并,即在同一个时间点上合并

reduce()
作用:把被观察者要发送的事件聚合成1个事件&发送
collect()
作用:将被观察者Observable发送的数据事件收集到一个数据结构里。

一. rxjava操作符的介绍

1. 转换类操作符(map  flatMap  concatMap  flatMapIterable   switchMap   scan   groupBy...);

所有这些Operators都作用于一个可观测序列,然后变换它发射的值,最后用一种新的形式返回它们。

概念不好理解,结合实际例子介绍。

  • Map : 可以将任意发射数据 转换成任意数据类型或对象,灵活度高。

 map()函数接受一个Func1类型的参数( map(Func1<? super T, ? extends R> func)),然后把这个Func1应用到每一个由Observable发射的值上,将发射的值转换为我们期望的值。官方给出的原理图:

假设需要将一组数字转换成字符串,可以通过map这样实现:

Observable.just(1, 2, 3, 4, 5)
.map(new Func1<Integer, String>() {
@Override
public String call(Integer i) {
       return "This is " + i;
 }}).subscribe(new Action1<String>() {
  @Override
  public void call(String s) {
      System.out.println(s);
} });

Func1构造函数中的两个参数分别是Observable发射值当前的类型和map转换后的类型,上面例子中发射值当前的类型是Integer,转换后的类型是String。

  • flatMap

flatMap()函数同样也是做转换的,但是作用却不一样。

flatMap不太好理解,直接看例子(我们公司是个房产平台,那我就拿房子举例):

假设有一组小区Community[] communites,  现在我们要输出每个小区的名字;可以这样实现:

Observable.from(communities)
        .map(new Func1<Community, String>() {
            @Override
            public String call(Community community) {
                return community.name;
            }}).subscribe(new Action1<String>() {
            @Override
            public void call(String name) {
                System.out.println("Community name : " + name);
            } });

现在需求有变化,需要打印出每个小区下面所有房子的价格。于是我可以这样实现:

Community[] communities = {};
Observable.from(communities)
        .subscribe(new Action1<Community>() {
            @Override
            public void call(Community community) {
                for (House house : community.houses) {
                    System.out.println("House price : " + house.price);
                }
        }});

如果不想在Subscriber中使用for循环,而是希望Subscriber中直接传入单个的House对象呢(这对于代码复用很重要)?

用map()显然是不行的,因为map()是一对一的转化,而现在的要求是一对多的转化。那使flatMap()把一个Community转化成多个House。

Observable.from(communities)
         .flatMap(new Func1<Community, Observable<House>>() {
            @Override
            public Observable<House> call(Community community) {
                return Observable.from(community.houses);
            }})
.subscribe(new Action1<House>() { @Override public void call(House house) { System.out.println("House price : " + house.price); }});

从前面的例子中你肯定发现了,

flatMap()和map()都是把传入的参数转化之后返回另一个对象。

但和map()不同的是,flatMap()中返回的是Observable对象,并且这个Observable对象并不是被直接发送到 Subscriber的回调方法中。

flatMap()的原理是这样的:

  1. 将传入的事件对象装换成一个Observable对象;
  2. 这是不会直接发送这个Observable, 而是将这个Observable激活让它自己开始发送事件;
  3. 每一个创建出来的Observable发送的事件,都被汇入同一个Observable,这个Observable负责将这些事件统一交给Subscriber的回调方法。

  这三个步骤,把事件拆成了两级,通过一组新创建的Observable将初始的对象『铺平』之后,通过统一路径分发了下去。而这个『铺平』就是flatMap()所谓的flat。

最后看flatMap的原理图:

  • ConcatMap

    concatMap()解决了flatMap()的交叉问题,它能够把发射的值连续在一起。concatMap的使用与flatMap基本一致,它可以保证输出数据有序。就像这样:

  • FlatMapIterable

    flatMapIterable()和flatMap()几乎是一样的,不同的是flatMapIterable()它转化的多个Observable是使用Iterable作为源数据的。

Observable.from(communities)
        .flatMapIterable(new Func1<Community, Iterable<House>>() {
            @Override
            public Iterable<House> call(Community community) {
                return community.houses;
            }})
        .subscribe(new Action1<House>() {
            @Override
            public void call(House house) { }});
  • SwitchMap

   switchMap()和flatMap()很像,除了一点:每当源Observable发射一个新的数据项(Observable)时,它将取消订阅并停止监视之前那个数据项产生的Observable,

并开始监视当前发射的这一个。 

  • Scan

   scan()对一个序列的数据应用一个函数,并将这个函数的结果发射出去作为下个数据应用合格函数时的第一个参数使用。

我们来看个简单的例子:

Observable.just(1, 2, 3, 4, 5).scan(new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) {
                return integer + integer2;
            }
        }).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        System.out.print(integer+“ ”);
    }});
输出结果为:1 3 6 10 15
  • GroupBy:直接字面上的意思理解就是分组。根据某种条件,将发射出去的数据进行分组。

groupBy()将原始Observable发射的数据按照key来拆分成一些小的Observable,然后这些小Observable分别发射其所包含的的数据,和SQL中的groupBy类似。

实际使用中,我们需要提供一个生成key的规则(也就是Func1中的call方法),所有key相同的数据会包含在同一个小的Observable中。

另外我们还可以提供一个函数来对这些数据进行转化,有点类似于集成了flatMap。

单纯的文字描述和图片解释可能难以理解,我们来看个例子:

假设我现在有一组房源List<House> houses,每套房子都属于某一个小区, 现在我们需要根据小区名来对房源进行分类,然后依次将房源信息输出。

List<House> houses = new ArrayList<>();
houses.add(new House("中粮·海景壹号", "中粮海景壹号新出大平层!总价4500W起"));
houses.add(new House("竹园新村", "满五唯一,黄金地段"));
houses.add(new House("中粮·海景壹号", "毗邻汤臣一品"));
houses.add(new House("竹园新村", "顶层户型,两室一厅"));
houses.add(new House("中粮·海景壹号", "南北通透,豪华五房"));
Observable<GroupedObservable<String, House>> groupByCommunityNameObservable = Observable.from(houses)
        .groupBy(new Func1<House, String>() {
            @Override
            public String call(House house) {
                return house.communityName;
            }});
通过上面的代码我们创建了一个新的Observable:groupByCommunityNameObservable,
它将会发送一个带有GroupedObservable的序列(也就是指发送的数据项的类型为GroupedObservable)。
GroupedObservable是一个特殊的Observable,它基于一个分组的key,在这个例子中的key就是小区名。现在我们需要将分类后的房源依次输出:
Observable.concat(groupByCommunityNameObservable)
        .subscribe(new Action1<House>() {
            @Override
            public void call(House house) {
                System.out.println("小区:"+house.communityName+"; 房源描述:"+house.desc);
            }});
执行结果:
小区:中粮·海景壹号; 房源描述:中粮海景壹号新出大平层!总价4500W起
小区:中粮·海景壹号; 房源描述:毗邻汤臣一品
小区:中粮·海景壹号; 房源描述:南北通透,豪华五房
小区:竹园新村; 房源描述:满五唯一,黄金地段
小区:竹园新村; 房源描述:顶层户型,两室一厅

转换类的操作符就先介绍到这,后续还会继续介绍组合、过滤类的操作符!

1.2 RxJava系列之组合操作符

 这类operators可以同时处理多个Observable来创建我们所需要的Observable。

组合操作符主要包含:Merge, StartWith, Concat, Zip, CombineLatest, SwitchOnNext, Join等。

  • Merge

    merge(Observable, Observable)将两个Observable发射的事件序列组合并成一个事件序列,就像是一个Observable发射的一样。

理解为两个Obsrvable合并成了一个Observable,合并后的数据是无序的。

一共有两个Observable:一个用来发送字母,另一个用来发送数字;现在我们需要两连个Observable发射的数据合并。

String[] letters = new String[]{"A", "B", "C", "D", "E", "F", "G", "H"};
Observable<String> letterSequence = Observable.interval(300, TimeUnit.MILLISECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long position) {
                return letters[position.intValue()];
            }
        }).take(letters.length);
Observable<Long> numberSequence = Observable.interval(500, TimeUnit.MILLISECONDS).take(5);
Observable.merge(letterSequence, numberSequence)
        .subscribe(new Observer<Serializable>() {
            @Override
            public void onCompleted() {
                System.exit(0);
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("Error:" + e.getMessage());
            }
            @Override
            public void onNext(Serializable serializable) {
                System.out.print(serializable.toString()+" ");
            }});   
程序输出:A 0 B C 1 D E 2 F 3 G H 4 

merge(Observable[])将多个Observable发射的事件序列组合并成一个事件序列,就像是一个Observable发射的一样。

 

  • Concat

concat(Observable<? extends T>, Observable<? extends T>) concat(Observable<? extends Observable<T>>)用于将多个obserbavle发射的的数据进行合并发射,concat严格按照顺序发射数据,前一个Observable没发射完是不会发射后一个Observable的数据的。

它和merge、startWitch和相似,不同之处在于:

  1. merge: 合并后发射的数据是无序的;
  2. startWitch:只能在源Observable发射的数据前插入数据。

这里我们将前面Merge操作符的例子拿过来,并将操作符换成Concat,然后我们看看执行结果:

String[] letters = new String[]{"A", "B", "C", "D", "E", "F", "G", "H"};
Observable<String> letterSequence = Observable.interval(300, TimeUnit.MILLISECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long position) {
                return letters[position.intValue()];
            }}).take(letters.length);
Observable<Long> numberSequence = Observable.interval(500, TimeUnit.MILLISECONDS).take(5);
Observable.concat(letterSequence, numberSequence)
        .subscribe(new Observer<Serializable>() {
            @Override
            public void onCompleted() {
                System.exit(0);
            }
            @Override
            public void onNext(Serializable serializable) {
                System.out.print(serializable.toString() + " ");
            }});
程序输出:A B C D E F G H 0 1 2 3 4 
  • Zip

zip(Observable, Observable, Func2) 用来合并两个Observable发射的数据项,根据Func2函数生成一个新的值并发射出去。

当其中一个Observable发送数据结束或者出现异常后,另一个Observable也将停在发射数据。

和前面的例子一样,我们将操作符换成了zip:

 String[] letters = new String[]{"A", "B", "C", "D", "E", "F", "G", "H"};

Observable<String> letterSequence = Observable.interval(120, TimeUnit.MILLISECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long position) {
                return letters[position.intValue()];
            } }).take(letters.length);
Observable<Long> numberSequence = Observable.interval(200, TimeUnit.MILLISECONDS).take(5);
Observable.zip(letterSequence, numberSequence, new Func2<String, Long, String>() {
    @Override
    public String call(String letter, Long number) {
        return letter + number;
    }}).subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.exit(0);
    }
    @Override
    public void onError(Throwable e) {
        System.out.println("Error:" + e.getMessage());
    }
    @Override
    public void onNext(String result) {
        System.out.print(result + " ");
    }});
程序输出:A0 B1 C2 D3 E4 
  • StartWith

startWith(T)用于在源Observable发射的数据前插入数据。使用startWith(Iterable<T>)我们还可以在源Observable发射的数据前插入Iterable。官方示意图:

startWith(Observable<T>)用于在源Observable发射的数据前插入另一个Observable发射的数据(这些数据会被插入到 源Observable发射数据的前面)。官方示意图:

  • CombineLatest

combineLatest(Observable, Observable, Func2)用于将两个Observale最近发射的数据已经Func2函数的规则进展组合。下面是官方提供的原理图:

下面这张图应该更容易理解:

List<String> communityNames = DataSimulator.getCommunityNames();
List<Location> locations = DataSimulator.getLocations();
Observable<String> communityNameSequence = Observable.interval(1, TimeUnit.SECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long position) {
                return communityNames.get(position.intValue());
            }}).take(communityNames.size());
Observable<Location> locationSequence = Observable.interval(1, TimeUnit.SECONDS)
        .map(new Func1<Long, Location>() {
            @Override
            public Location call(Long position) {
                return locations.get(position.intValue());
            }}).take(locations.size());
Observable.combineLatest(
        communityNameSequence,
        locationSequence,
        new Func2<String, Location, String>() {
            @Override
            public String call(String communityName, Location location) {
                return "小区名:" + communityName + ", 经纬度:" + location.toString();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                System.exit(0);
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("Error:" + e.getMessage());}
            @Override
            public void onNext(String s) {
                System.out.println(s);
            } });
程序输出:
小区名:竹园新村, 经纬度:(21.827, 23.323)
小区名:康桥半岛, 经纬度:(21.827, 23.323)
小区名:康桥半岛, 经纬度:(11.923, 16.309)
小区名:中粮·海景壹号, 经纬度:(11.923, 16.309)
小区名:中粮·海景壹号, 经纬度:(22.273, 53.623)
小区名:浦江名苑, 经纬度:(22.273, 53.623)
小区名:南辉小区, 经纬度:(22.273, 53.623)
  • SwitchOnNext

switchOnNext(Observable<? extends Observable<? extends T>>用来将一个发射多个小Observable的源Observable转化为一个Observable,然后发射这多个小Observable所发射的数据。如果一个小的Observable正在发射数据的时候,源Observable又发射出一个新的小Observable,则前一个Observable发射的数据会被抛弃,直接发射新 的小Observable所发射的数据。 

结合下面的原理图大家应该很容易理解,我们可以看到下图中的黄色圆圈就被丢弃了。

  • Join

   join(Observable, Func1, Func1, Func2)  先介绍下join操作符的4个参数:

  • Observable:源Observable需要组合的Observable,这里我们姑且称之为目标Observable;
  • Func1:接收从源Observable发射来的数据,并返回一个Observable,这个Observable的声明周期决定了源Obsrvable发射出来的数据的有效期;
  • Func1:接收目标Observable发射来的数据,并返回一个Observable,这个Observable的声明周期决定了目标Obsrvable发射出来的数据的有效期;
  • Func2:接收从源Observable和目标Observable发射出来的数据,并将这两个数据组合后返回。

所以Join操作符的语法结构大致是这样的:onservableA.join(observableB, 控制observableA发射数据有效期的函数, 控制observableB发射数据有效期的函数,两个observable发射数据的合并规则)

join操作符的效果类似于排列组合,把第一个数据源A作为基座窗口,他根据自己的节奏不断发射数据元素,第二个数据源B,每发射一个数据,我们都把它和第一个数据源A中已经发射的数据进行一对一匹配;举例来说,如果某一时刻B发射了一个数据“B”,此时A已经发射了0,1,2,3共四个数据,那么我们的合并操作就会把“B”依次与0,1,2,3配对,得到四组数据: [0, B][1, B] [2, B] [3, B]

再看看下面的图是不是好理解了呢?!

读懂了上面的文字,我们再来写段代码加深理解。

final List<House> houses = DataSimulator.getHouses();//模拟的房源数据,用于测试
//用来每秒从houses总取出一套房源并发射出去
Observable<House> houseSequence =Observable.interval(1, TimeUnit.SECONDS).map(new Func1<Long, House>() {
                    @Override
                    public House call(Long position) {
                        return houses.get(position.intValue());
                    }}).take(houses.size());//这里的take是为了防止houses.get(position.intValue())数组越界
//用来实现每秒发送一个新的Long型数据
Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);
houseSequence.join(tictoc,
        new Func1<House, Observable<Long>>() {
            @Override
            public Observable<Long> call(House house) {
                return Observable.timer(2, TimeUnit.SECONDS);
            } },
        new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                return Observable.timer(0, TimeUnit.SECONDS);
            } },
        new Func2<House, Long, String>() {
            @Override
            public String call(House house, Long aLong) {
                return aLong + "-->" + house.getDesc();
            }
        }).subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.exit(0);}
    @Override
    public void onError(Throwable e) {
        System.out.println("Error:"+e.getMessage());}
    @Override
    public void onNext(String s) {
        System.out.println(s);}});

程序输出:

0-->中粮海景壹号新出大平层!总价4500W起
1-->中粮海景壹号新出大平层!总价4500W起
1-->满五唯一,黄金地段
2-->中粮海景壹号新出大平层!总价4500W起
2-->满五唯一,黄金地段
2-->一楼自带小花园
3-->一楼自带小花园
3-->毗邻汤臣一品
4-->毗邻汤臣一品
4-->顶级住宅,给您总统般尊贵体验
5-->顶级住宅,给您总统般尊贵体验
5-->顶层户型,两室一厅
6-->顶层户型,两室一厅
6-->南北通透,豪华五房
7-->南北通透,豪华五房

1.3  过滤类操作符(fileter take takeLast takeUntil distinct distinctUntilChanged skip skipLast ...);

 顾名思义,这类operators主要用于对事件数据的筛选过滤,只返回满足我们条件的数据。

过滤类操作符主要包含: Filter, Take, TakeLast, TakeUntilSkip, SkipLast, ElementAt, Debounce, Distinct, DistinctUntilChanged, First, Last等等。

  • Filter 

filter(Func1)用来过滤观测序列中我们不想要的值,只返回满足条件的值,我们看下原理图:

还是拿前面文章中的小区Community[] communities来举例,假设我需要赛选出所有房源数大于10个的小区,我们可以这样实现:

Observable.from(communities)
        .filter(new Func1<Community, Boolean>() {
            @Override
            public Boolean call(Community community) {
                return community.houses.size()>10;
            }
        }).subscribe(new Action1<Community>() {
    @Override
    public void call(Community community) {
        System.out.println(community.name);
    }});
  • Take 

take(int)用一个整数n作为一个参数,从原始的序列中 发射前n个元素.

现在我们需要取小区列表communities中的前10个小区

Observable.from(communities)
.take(10)
.subscribe(new Action1<Community>() {
@Override
public void call(Community community) {
System.out.println(community.name);
}});

  • TakeLast

takeLast(int)同样用一个整数n作为参数,只不过它发射的是观测序列中后n个元素。

获取小区列表communities中的后3个小区

Observable.from(communities)
.takeLast(3)
.subscribe(new Action1<Community>() {
@Override
public void call(Community community) {
System.out.println(community.name);
} });

  • TakeUntil

takeUntil(Observable)订阅并开始发射原始Observable,同时监视我们提供的第二个Observable。如果第二个Observable发射了一项数据或者发射了一个终止通知,takeUntil()返回的Observable会停止发射原始Observable并终止。

Observable<Long> observableA = Observable.interval(300, TimeUnit.MILLISECONDS);
Observable<Long> observableB = Observable.interval(800, TimeUnit.MILLISECONDS);
observableA.takeUntil(observableB)
        .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.exit(0);}
            @Override
            public void onNext(Long aLong) {
                System.out.println(aLong);
            }});
try {
    Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {}

程序输出:

0
1

takeUntil(Func1)通过Func1中的call方法来判断是否需要终止发射数据。

Observable.just(1, 2, 3, 4, 5, 6, 7)
.takeUntil(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer >= 5;
}}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}});

程序输出:

1
2
3
4
5
  • Skip

skip(int)让我们可以忽略Observable发射的前n项数据。

过滤掉小区列表communities中的前5个小区

Observable.from(communities)
.skip(5)
.subscribe(new Action1<Community>() {
@Override
public void call(Community community) {
System.out.println(community.name);
}});

  • SkipLast

skipLast(int)忽略Observable发射的后n项数据。

  • ElementAt 

elementAt(int)用来获取元素Observable发射的事件序列中的第n项数据,并当做唯一的数据发射出去。

  • Debounce

debounce(long, TimeUnit)过滤掉了由Observable发射的速率过快的数据;如果在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个。通常我们用来结合RxBinding(Jake Wharton大神使用RxJava封装的Android UI组件)使用,防止button重复点击。

debounce(Func1)可以根据Func1的call方法中的函数来过滤,Func1中的中的call方法返回了一个临时的Observable,如果原始的Observable在发射一个新的数据时,上一个数据根据Func1的call方法生成的临时Observable还没结束,那么上一个数据就会被过滤掉。

  • Distinct

distinct()的过滤规则是只允许还没有发射过的数据通过,所有重复的数据项都只会发射一次。

过滤掉一段数字中的重复项:

Observable.just(2, 1, 2, 2, 3, 4, 3, 4, 5, 5)
.distinct()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.print(i + " ");
}});

程序输出:

2 1 3 4 5 

distinct(Func1)参数中的Func1中的call方法会根据Observable发射的值生成一个Key,然后比较这个key来判断两个数据是不是相同;如果判定为重复则会和distinct()一样过滤掉重复的数据项。

假设我们要过滤掉一堆房源中小区名重复的小区:

List<House> houses = new ArrayList<>();
//House构造函数中的第一个参数为该房源所属小区名,第二个参数为房源描述
List<House> houses = new ArrayList<>();
houses.add(new House("中粮·海景壹号", "中粮海景壹号新出大平层!总价4500W起"));
houses.add(new House("竹园新村", "满五唯一,黄金地段"));
houses.add(new House("竹园新村", "一楼自带小花园"));
houses.add(new House("中粮·海景壹号", "毗邻汤臣一品"));
houses.add(new House("中粮·海景壹号", "顶级住宅,给您总统般尊贵体验"));
houses.add(new House("竹园新村", "顶层户型,两室一厅"));
houses.add(new House("中粮·海景壹号", "南北通透,豪华五房"));
Observable.from(houses).distinct(new Func1<House, String>() {
            @Override
            public String call(House house) {
                return house.communityName;
            }}).subscribe(new Action1<House>() {
            @Override
            public void call(House house) {
                System.out.println("小区:" + house.communityName + "; 房源描述:" + house.desc);
            }});            
程序输出:
小区:中粮·海景壹号; 房源描述:中粮海景壹号新出大平层!总价4500W起
小区:竹园新村; 房源描述:满五唯一,黄金地段
  • DistinctUntilChanged

distinctUntilChanged()和distinct()类似,只不过它判定的是Observable发射的当前数据项和前一个数据项是否相同。

同样还是上面过滤数字的例子:
Observable.just(2, 1, 2, 2, 3, 4, 3, 4, 5, 5)
        .distinctUntilChanged()
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer i) {
                System.out.print(i + " ");
            }});
程序输出:2 1 2 3 4 3 4 5 

distinctUntilChanged(Func1)和distinct(Func1)一样,根据Func1中call方法产生一个Key来判断两个相邻的数据项是否相同。

我们还是拿前面的过滤房源的例子:

Observable.from(houses).distinctUntilChanged(new Func1<House, String>() {
            @Override
            public String call(House house) {
                return house.communityName;
            }
        }).subscribe(new Action1<House>() {
    @Override
    public void call(House house) {
        System.out.println("小区:" + house.communityName + "; 房源描述:" + house.desc);
    }});

程序输出:

小区:中粮·海景壹号; 房源描述:中粮海景壹号新出大平层!总价4500W起
小区:竹园新村; 房源描述:满五唯一,黄金地段
小区:中粮·海景壹号; 房源描述:毗邻汤臣一品
小区:竹园新村; 房源描述:顶层户型,两室一厅
小区:中粮·海景壹号; 房源描述:南北通透,豪华五房
  • First

first()顾名思义,它是的Observable只发送观测序列中的第一个数据项。

获取房源列表houses中的第一套房源:
Observable.from(houses)
        .first()
        .subscribe(new Action1<House>() {
            @Override
            public void call(House house) {
                System.out.println("小区:" + house.communityName + "; 房源描述:" + house.desc);
            }    });
程序输出:小区:中粮·海景壹号; 房源描述:中粮海景壹号新出大平层!总价4500W起

first(Func1)只发送符合条件的第一个数据项。

现在我们要获取房源列表houses中小区名为竹园新村的第一套房源。

Observable.from(houses)
        .first(new Func1<House, Boolean>() {
            @Override
            public Boolean call(House house) {
                return "竹园新村".equals(house.communityName);
            }})
        .subscribe(new Action1<House>() {
            @Override
            public void call(House house) {
                System.out.println("小区:" + house.communityName + "; 房源描述:" + house.desc);
            }});
程序输出:小区:竹园新村; 房源描述:满五唯一,黄金地段
  • Last

     last()只发射观测序列中的最后一个数据项。

获取房源列表中的最后一套房源:

Observable.from(houses)
        .last()
        .subscribe(new Action1<House>() {
            @Override
            public void call(House house) {
                System.out.println("小区:" + house.communityName + "; 房源描述:" + house.desc);
            } });
程序输出:小区:中粮·海景壹号; 房源描述:南北通透,豪华五房

last(Func1)只发射观测序列中符合条件的最后一个数据项。

获取房源列表houses中小区名为竹园新村的最后一套房源:

Observable.from(houses)
        .last(new Func1<House, Boolean>() {
            @Override
            public Boolean call(House house) {
                return "竹园新村".equals(house.communityName);
            }})
        .subscribe(new Action1<House>() {
            @Override
            public void call(House house) {
                System.out.println("小区:" + house.communityName + "; 房源描述:" + house.desc);
            }});
程序输出:小区:竹园新村; 房源描述:顶层户型,两室一厅

过滤类操作符的介绍大家可以去查阅官方文档和源码;在下一章继续介绍组合类操作符。

通过转换操作符过滤操作符组合操作符三个篇幅将RxJava主要的操作符也介绍的七七八八了。更多操作符的介绍建议大家去查阅官方文档,并自己动手实践一下。 

Demo源码地址:GitHub - BaronZ88/HelloRxJava: RxJavaDemo