Flowable subscribe流程介绍
2023-02-18 16:34:06 时间
转载请以链接形式标明出处: 本文出自:103style的博客
Flowable 的 subscribe 方法
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
}
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
return subscribe(onNext, onError, Functions.EMPTY_ACTION,
FlowableInternalHelper.RequestMax.INSTANCE);
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete) {
return subscribe(onNext, onError, onComplete,
FlowableInternalHelper.RequestMax.INSTANCE);
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Subscription> onSubscribe) {
LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
public final void subscribe(Subscriber<? super T> s) {
if (s instanceof FlowableSubscriber) {
subscribe((FlowableSubscriber<? super T>)s);
} else {
ObjectHelper.requireNonNull(s, "s is null");
subscribe(new StrictSubscriber<T>(s));
}
}
public final void subscribe(FlowableSubscriber<? super T> s) {
ObjectHelper.requireNonNull(s, "s is null");
try {
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
subscribeActual(z);
} catch (...) {
...
}
}
protected abstract void subscribeActual(Subscriber<? super T> s);
前面四个方法都是调用了通过默认的:
Functions.emptyConsumer()
:
static final class EmptyConsumer implements Consumer<Object> {
@Override
public void accept(Object v) { }
@Override
public String toString() {
return "EmptyConsumer";
}
}
Functions.ON_ERROR_MISSING
:
static final class OnErrorMissingConsumer implements Consumer<Throwable> {
@Override
public void accept(Throwable error) {
RxJavaPlugins.onError(new OnErrorNotImplementedException(error));
}
}
Functions.EMPTY_ACTION
:
static final class EmptyAction implements Action {
@Override
public void run() { }
@Override
public String toString() {
return "EmptyAction";
}
}
FlowableInternalHelper.RequestMax.INSTANCE
:
public enum RequestMax implements Consumer<Subscription> {
INSTANCE;
@Override
public void accept(Subscription t) throws Exception {
t.request(Long.MAX_VALUE);
}
}
调用了subscribe(onNext, onError, onComplete, onSubscribe)
,然后将四个参数包装成一个 LambdaSubscriber
对象 传递给 子类重写 的 subscribeActual
方法。
而 subscribe(Subscriber<? super T> s)
则通过自己传递 实现FlowableSubscriber
接口 或者 传递一个Subscriber
构造成StrictSubscriber
传递给 子类重写 的 subscribeActual
方法。
然后接下来的流程就和 Rxjava之create操作符源码解析 中介绍的类似。
以上。
相关文章
- JAVA以UTF-8导出CSV文件,用excel打开产生乱码的解决方法
- 性能测试(第3集)第17讲JMeter测试计划&线程组&HTTPCookie管理&用户定义的变量&HTTP头信息管理&循环控制器
- 性能测试(第3集)第18讲:JMeter HTTP Request&参数化&CVS Data Set Config&函数助手
- 性能测试(第3集)第19讲:JMeter各种断言
- 正则表达式写法分享
- 性能测试(第3集)第20讲:JMeter察看结果树及正则表达式的使用
- 性能测试(第3集)第21讲:JMeter JDBC介绍
- idea下远程debug时,一键上传本地代码到服务器指定目录,无需再依赖文件传输工具
- keepalived安装配置
- inotify-tools工具安装配置
- Keepalived工作原理
- HBase单机实现主主复制(高可用方案)
- 文件同步工具rsyncd介绍及安装配置
- 【测开技能】Java系列(三十一)包
- 异步任务如何测试?
- Jenkins介绍与安装
- jenkins 如何去创建一个job
- Jenkins权限管理
- Jenkins配置邮件通知
- 鱼跃医疗:遭遇“滑铁卢”