RxJava之concatMap系列转换操作符源码介绍
转载请以链接形式标明出处: 本文出自:103style的博客
转换相关的操作符 以及 官方介绍
RxJava
之 concatMap
系列 转换操作符 官方介绍 :Transforming Observables
concatMap
concatMapCompletable
concatMapCompletableDelayError
concatMapDelayError
concatMapEager
concatMapEagerDelayError
concatMapIterable
concatMapMaybe
concatMapMaybeDelayError
concatMapSingle
concatMapSingleDelayError
以下介绍我们就直接具体实现,中间流程请参考 RxJava之create操作符源码解析。
concatMap
官方示例:
Observable.range(0, 5)
.concatMap(i -> {
long delay = Math.round(Math.random() * 2);
return Observable.timer(delay, TimeUnit.SECONDS).map(n -> i);
})
.blockingSubscribe(System.out::print);
输出:
01234
返回对象的 ObservableConcatMap
的 subscribeActual
方法:
单参数的concatMap
操作符默认的 delayErrors
为 ErrorMode.IMMEDIATE
。
public void subscribeActual(Observer<? super U> observer) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, observer, mapper)) {
return;
}
if (delayErrors == ErrorMode.IMMEDIATE) {
SerializedObserver<U> serial = new SerializedObserver<U>(observer);
source.subscribe(new SourceObserver<T, U>(serial, mapper, bufferSize));
} else {
source.subscribe(new ConcatMapDelayErrorObserver<T, U>(observer, mapper, bufferSize, delayErrors == ErrorMode.END));
}
}
继续看 SourceObserver
的 onNext(T t)
:
public void onNext(T t) {
if (done) {
return;
}
if (fusionMode == QueueDisposable.NONE) {
queue.offer(t);
}
drain();
}
public void onComplete() {
if (done) {
return;
}
done = true;
drain();
}
void drain() {
...
for (;;) {
...
if (!active) {
...
if (!empty) {
ObservableSource<? extends U> o;
try {
//1.0
o = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
dispose();
queue.clear();
downstream.onError(ex);
return;
}
active = true;
o.subscribe(inner);//2.0
}
}
...
}
}
(1.0)
在这里我们看到通过concatMap
操作符传入Function
的apply
重新构建了一个ObservableSource
对象。(2.0)
然后新建的ObservableSource
对象来subscribe(observer)
。
concatMapXXX
concatMapCompletable
、concatMapCompletableDelayError
、concatMapDelayError
、concatMapEager
、concatMapEagerDelayError
、concatMapIterable
、concatMapMaybe
、concatMapMaybeDelayError
、concatMapSingle
、concatMapSingleDelayError
实现逻辑和concatMap
类似,就不再赘述了。
官方示例:
concatMapCompletable
Observable<Integer> source = Observable.just(2, 1, 3);
Completable completable = source.concatMapCompletable(x -> {
return Completable.timer(x, TimeUnit.SECONDS)
.doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));
});
completable.doOnComplete(() -> System.out.println("Info: Processing of all items completed"))
.blockingAwait();
输出:
Info: Processing of item "2" completed
Info: Processing of item "1" completed
Info: Processing of item "3" completed
Info: Processing of all items completed
concatMapCompletableDelayError
Observable<Integer> source = Observable.just(2, 1, 3);
Completable completable = source.concatMapCompletableDelayError(x -> {
if (x.equals(2)) {
return Completable.error(new IOException("Processing of item \"" + x + "\" failed!"));
} else {
return Completable.timer(1, TimeUnit.SECONDS)
.doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));
}
});
completable.doOnError(error -> System.out.println("Error: " + error.getMessage()))
.onErrorComplete()
.blockingAwait();
输出:
Info: Processing of item "1" completed
Info: Processing of item "3" completed
Error: Processing of item "2" failed!
concatMapDelayError
Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS)
.concatMapDelayError(x -> {
if (x.equals(1L))
return Observable.error(new IOException("Something went wrong!"));
else return Observable.just(x, x * x);
})
.blockingSubscribe(
x -> System.out.println("onNext: " + x),
error -> System.out.println("onError: " + error.getMessage()));
输出:
onNext: 2
onNext: 4
onNext: 3
onNext: 9
onError: Something went wrong!
concatMapEager
Observable.range(0, 5)
.concatMapEager(i -> {
long delay = Math.round(Math.random() * 3);
return Observable.timer(delay, TimeUnit.SECONDS)
.map(n -> i)
.doOnNext(x -> System.out.println("Info: Finished processing item " + x));
})
.blockingSubscribe(i -> System.out.println("onNext: " + i));
输出:
Info: Finished processing item 2
Info: Finished processing item 3
Info: Finished processing item 1
Info: Finished processing item 0
Info: Finished processing item 4
onNext: 0
onNext: 1
onNext: 2
onNext: 3
onNext: 4
concatMapEagerDelayError
Observable<Integer> source = Observable.create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Error("Fatal error!"));
});
source.doOnError(error -> System.out.println("Info: Error from main source " + error.getMessage()))
.concatMapEagerDelayError(x -> {
return Observable.timer(1, TimeUnit.SECONDS).map(n -> x)
.doOnSubscribe(it -> System.out.println("Info: Processing of item \"" + x + "\" started"));
}, true)
.blockingSubscribe(
x -> System.out.println("onNext: " + x),
error -> System.out.println("onError: " + error.getMessage()));
输出:
Info: Processing of item "1" started
Info: Processing of item "2" started
Info: Error from main source Fatal error!
onNext: 1
onNext: 2
onError: Fatal error!
concatMapIterable
Observable.just("A", "B", "C")
.concatMapIterable(item -> Arrays.asList(item, item, item))
.subscribe(System.out::print);
输出:
AAABBBCCC
concatMapMaybe
Observable.just("5", "3,14", "2.71", "FF")
.concatMapMaybe(v -> {
return Maybe.fromCallable(() -> Double.parseDouble(v))
.doOnError(e -> System.out.println("Info: The value \"" + v + "\" could not be parsed."))
// Ignore values that can not be parsed.
.onErrorComplete();
})
.subscribe(x -> System.out.println("onNext: " + x));
输出:
onNext: 5.0
Info: The value "3,14" could not be parsed.
onNext: 2.71
Info: The value "FF" could not be parsed.
concatMapMaybeDelayError
DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("dd.MM.uuuu");
Observable.just("04.03.2018", "12-08-2018", "06.10.2018", "01.12.2018")
.concatMapMaybeDelayError(date -> {
return Maybe.fromCallable(() -> LocalDate.parse(date, dateFormatter));
})
.subscribe(
localDate -> System.out.println("onNext: " + localDate),
error -> System.out.println("onError: " + error.getMessage()));
输出:
onNext: 2018-03-04
onNext: 2018-10-06
onNext: 2018-12-01
onError: Text '12-08-2018' could not be parsed at index 2
concatMapSingle
Observable.just("5", "3,14", "2.71", "FF")
.concatMapSingle(v -> {
return Single.fromCallable(() -> Double.parseDouble(v))
.doOnError(e -> System.out.println("Info: The value \"" + v + "\" could not be parsed."))
// Return a default value if the given value can not be parsed.
.onErrorReturnItem(42.0);
})
.subscribe(x -> System.out.println("onNext: " + x));
输出:
onNext: 5.0
Info: The value "3,14" could not be parsed.
onNext: 42.0
onNext: 2.71
Info: The value "FF" could not be parsed.
onNext: 42.0
concatMapSingleDelayError
DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("dd.MM.uuuu");
Observable.just("24.03.2018", "12-08-2018", "06.10.2018", "01.12.2018")
.concatMapSingleDelayError(date -> {
return Single.fromCallable(() -> LocalDate.parse(date, dateFormatter));
})
.subscribe(
localDate -> System.out.println("onNext: " + localDate),
error -> System.out.println("onError: " + error.getMessage()));
输出:
onNext: 2018-03-24
onNext: 2018-10-06
onNext: 2018-12-01
onError: Text '12-08-2018' could not be parsed at index 2
相关文章
- [javaEE] Servlet的手动配置
- [javaEE] HTTP协议总结
- [javaEE] web应用的目录结构&配置虚拟主机
- [javaEE] Tomcat的安装与配置
- 「万字图文」史上最姨母级Java继承详解
- 带你深入理解Java的IO到底是个啥
- 不藏了,这些Java反射用法总结都告诉你们
- java算法易筋经:常见java-API使用技巧
- 论文/代码速递2022.10.19!
- 论文/代码速递2022.10.20!
- 【AI绘画】如何优雅的在本地配置 nounovelai ?
- 英伟达最新成果!基于NeRF的并行优化方法,可用于6D姿态估计!论文/代码速递2022.10.21!
- 论文/代码速递2022.10.24!
- 低分辨率人脸识别!注意力相似性知识提取方法!论文/代码速递2022.10.25!
- 论文/代码速递2022.10.26!
- ECCV 2022 | 开放集半监督目标检测!论文/代码速递2022.10.27!
- 论文/代码速递2022.10.28!
- SCI语料库!学术写作神器——Academic Phrasebank
- 查找表实现高效的图像超分辨率!论文/代码速递2022.10.31!
- 论文/代码速递2022.11.1!