【我的Android进阶之旅】 RxJava 理解Backpressure并解决异常 rx.exceptions.MissingBackpressureException
2023-09-27 14:29:23 时间
java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling. at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:112) at android.os.Handler.handleCallback(Handler.java:751) at android.os.Handler.dispatchMessage(Handler.java:95) at android.os.Looper.loop(Looper.java:154) at android.app.ActivityThread.main(ActivityThread.java:6119) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776) Caused by: rx.exceptions.OnErrorNotImplementedException at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386) at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383) at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44) at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:157) at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219) at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107) ... 7 more Caused by: rx.exceptions.MissingBackpressureException at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:162) at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52) at rx.Scheduler$Worker$1.call(Scheduler.java:134) at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:187) at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428) at java.util.concurrent.FutureTask.run(FutureTask.java:237) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) at java.lang.Thread.run(Thread.java:761) rx.exceptions.OnErrorNotImplementedException at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386) at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383) at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44) at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:157) at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219) at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107) at android.os.Handler.handleCallback(Handler.java:751) at android.os.Handler.dispatchMessage(Handler.java:95) at android.os.Looper.loop(Looper.java:154) at android.app.ActivityThread.main(ActivityThread.java:6119) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776) Caused by: rx.exceptions.MissingBackpressureException at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:162) at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52) at rx.Scheduler$Worker$1.call(Scheduler.java:134) at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:187) at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428) at java.util.concurrent.FutureTask.run(FutureTask.java:237) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) at java.lang.Thread.run(Thread.java:761) rx.exceptions.MissingBackpressureException at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:162) at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52) at rx.Scheduler$Worker$1.call(Scheduler.java:134) at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:187) at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428) at java.util.concurrent.FutureTask.run(FutureTask.java:237) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) at java.lang.Thread.run(Thread.java:761)
查看MissingBackpressureException异常类的源代码如下描述:
* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. package rx.exceptions; * Represents an exception that indicates that a Subscriber or operator attempted to apply reactive pull * backpressure to an Observable that does not implement it. * p * If an Observable has not been written to support reactive pull backpressure (such support is not a * requirement for Observables), you can apply one of the following operators to it, each of which forces a * simple form of backpressure behavior: * dl * dt code onBackpressureBuffer /code /dt * dd maintains a buffer of all emissions from the source Observable and emits them to downstream Subscribers * according to the requests they generate /dd * dt code onBackpressureDrop /code /dt * dd drops emissions from the source Observable unless there is a pending request from a downstream * Subscriber, in which case it will emit enough items to fulfill the request /dd * /dl * If you do not apply either of these operators to an Observable that does not support backpressure, and if * either you as the Subscriber or some operator between you and the Observable attempts to apply reactive pull * backpressure, you will encounter a {@code MissingBackpressureException} which you will be notified of via * your {@code onError} callback. * p * There are, however, other options. You can throttle an over-producing Observable with operators like * {@code sample}/{@code throttleLast}, {@code throttleFirst}, or {@code throttleWithTimeout}/{@code debounce}. * You can also take the large number of items emitted by an over-producing Observable and package them into * a smaller set of emissions by using operators like {@code buffer} and {@code window}. * p * For a more complete discussion of the options available to you for dealing with issues related to * backpressure and flow control in RxJava, see * a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure" RxJava wiki: Backpressure /a . public class MissingBackpressureException extends Exception { private static final long serialVersionUID = 7250870679677032194L; * Constructs the exception without any custom message. public MissingBackpressureException() { * Constructs the exception with the given customized message. * @param message the customized message public MissingBackpressureException(String message) { super(message);
如上面文字描述,抛出MissingBackpressureException往往就是因为,被观察者发送事件的速度太快,而观察者处理太慢,而且你还没有做相应措施,所以报异常。
onBackpressurebuffer:
把observable发送出来的事件做缓存,当request方法被调用的时候,给下层流发送一个item(如果给这个缓存区设置了大小,那么超过了这个大小就会抛出异常)。
onBackpressureDrop:
将observable发送的事件抛弃掉,直到subscriber再次调用request(n)方法的时候,就发送给它这之后的n个事件。
下面参考了几篇博客之后,理解了Backpressure的概念,并解决了rx.exceptions.MissingBackpressureException异常,读者也可以参考以下几篇博客之后,自己根据自己实际情况来选择不同的解决方式来解决该异常。
参考链接如下:
关于Java/Android开发中常见异常错误汇总___知识导图分享 1.若博客图片不清晰,可以鼠标点击图片查看,或者下载到电脑中查看,也可以留言给我,发你高清原图。希望对热爱学习的朋友有帮助! 2.这是汇总的关于Java/Android开发中常见的异常、错误,绘制导图如下:
字节卷动 You will never know how excellent you are unless you impel yourself once.
相关文章
- android studio 安装总结
- 关于android工具链
- android. 长图加载
- Android OnLowMemory和OnTrimMemory
- Rxjava + retrofit + dagger2 + mvp搭建Android框架
- 46道面试题带你了解中高级Android面试,知乎上转疯了!
- 转:给 Android 开发者的 RxJava 详解
- Android 经典笔记之五:DownloadManager下载管理器介绍
- 给 Android 开发者的 RxJava 详解
- Android RxJava使用场景小结
- Android RxJava+Retrofit 一次(合并)请求多个接口
- Android 自动滚动的GridView
- Android EventBus实战
- 带你学开源项目:Meizhi Android之RxJava & Retrofit最佳实践
- (转)libhybris及EGL Platform-在Glibc生态中重用Android的驱动
- 揭开Flutter工程编译的面纱(Android篇)
- Android 四大组件(一)Activity
- Android中Handler的使用
- 我的Android进阶之旅------>Ubuntu下不能识别Android设备的解决方法