zl程序教程

您现在的位置是:首页 >  移动开发

当前栏目

【我的Android进阶之旅】 RxJava 理解Backpressure并解决异常 rx.exceptions.MissingBackpressureException

Androidrxjava异常 解决 理解 之旅 进阶 exceptions
2023-09-27 14:29:23 时间
java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.

 at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:112)

 at android.os.Handler.handleCallback(Handler.java:751)

 at android.os.Handler.dispatchMessage(Handler.java:95)

 at android.os.Looper.loop(Looper.java:154)

 at android.app.ActivityThread.main(ActivityThread.java:6119)

 at java.lang.reflect.Method.invoke(Native Method)

 at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886)

 at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776)

Caused by: rx.exceptions.OnErrorNotImplementedException

 at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)

 at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)

 at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)

 at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:157)

 at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)

 at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276)

 at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219)

 at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107)

 ... 7 more

Caused by: rx.exceptions.MissingBackpressureException

 at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:162)

 at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52)

 at rx.Scheduler$Worker$1.call(Scheduler.java:134)

 at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:187)

 at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)

 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)

 at java.util.concurrent.FutureTask.run(FutureTask.java:237)

 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)

 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)

 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)

 at java.lang.Thread.run(Thread.java:761)

rx.exceptions.OnErrorNotImplementedException

 at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)

 at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)

 at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)

 at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:157)

 at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)

 at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276)

 at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219)

 at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107)

 at android.os.Handler.handleCallback(Handler.java:751)

 at android.os.Handler.dispatchMessage(Handler.java:95)

 at android.os.Looper.loop(Looper.java:154)

 at android.app.ActivityThread.main(ActivityThread.java:6119)

 at java.lang.reflect.Method.invoke(Native Method)

 at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886)

 at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776)

Caused by: rx.exceptions.MissingBackpressureException

 at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:162)

 at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52)

 at rx.Scheduler$Worker$1.call(Scheduler.java:134)

 at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:187)

 at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)

 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)

 at java.util.concurrent.FutureTask.run(FutureTask.java:237)

 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)

 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)

 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)

 at java.lang.Thread.run(Thread.java:761)

rx.exceptions.MissingBackpressureException

 at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:162)

 at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52)

 at rx.Scheduler$Worker$1.call(Scheduler.java:134)

 at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:187)

 at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)

 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)

 at java.util.concurrent.FutureTask.run(FutureTask.java:237)

 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)

 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)

 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)

 at java.lang.Thread.run(Thread.java:761)

查看MissingBackpressureException异常类的源代码如下描述:


* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. package rx.exceptions; * Represents an exception that indicates that a Subscriber or operator attempted to apply reactive pull * backpressure to an Observable that does not implement it. * p * If an Observable has not been written to support reactive pull backpressure (such support is not a * requirement for Observables), you can apply one of the following operators to it, each of which forces a * simple form of backpressure behavior: * dl * dt code onBackpressureBuffer /code /dt * dd maintains a buffer of all emissions from the source Observable and emits them to downstream Subscribers * according to the requests they generate /dd * dt code onBackpressureDrop /code /dt * dd drops emissions from the source Observable unless there is a pending request from a downstream * Subscriber, in which case it will emit enough items to fulfill the request /dd * /dl * If you do not apply either of these operators to an Observable that does not support backpressure, and if * either you as the Subscriber or some operator between you and the Observable attempts to apply reactive pull * backpressure, you will encounter a {@code MissingBackpressureException} which you will be notified of via * your {@code onError} callback. * p * There are, however, other options. You can throttle an over-producing Observable with operators like * {@code sample}/{@code throttleLast}, {@code throttleFirst}, or {@code throttleWithTimeout}/{@code debounce}. * You can also take the large number of items emitted by an over-producing Observable and package them into * a smaller set of emissions by using operators like {@code buffer} and {@code window}. * p * For a more complete discussion of the options available to you for dealing with issues related to * backpressure and flow control in RxJava, see * a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure" RxJava wiki: Backpressure /a . public class MissingBackpressureException extends Exception { private static final long serialVersionUID = 7250870679677032194L; * Constructs the exception without any custom message. public MissingBackpressureException() { * Constructs the exception with the given customized message. * @param message the customized message public MissingBackpressureException(String message) { super(message);

如上面文字描述,抛出MissingBackpressureException往往就是因为,被观察者发送事件的速度太快,而观察者处理太慢,而且你还没有做相应措施,所以报异常。


onBackpressurebuffer:
把observable发送出来的事件做缓存,当request方法被调用的时候,给下层流发送一个item(如果给这个缓存区设置了大小,那么超过了这个大小就会抛出异常)。
这里写图片描述

onBackpressureDrop:
将observable发送的事件抛弃掉,直到subscriber再次调用request(n)方法的时候,就发送给它这之后的n个事件。

这里写图片描述

下面参考了几篇博客之后,理解了Backpressure的概念,并解决了rx.exceptions.MissingBackpressureException异常,读者也可以参考以下几篇博客之后,自己根据自己实际情况来选择不同的解决方式来解决该异常。

参考链接如下:


关于Java/Android开发中常见异常错误汇总___知识导图分享 1.若博客图片不清晰,可以鼠标点击图片查看,或者下载到电脑中查看,也可以留言给我,发你高清原图。希望对热爱学习的朋友有帮助! 2.这是汇总的关于Java/Android开发中常见的异常、错误,绘制导图如下:
字节卷动 You will never know how excellent you are unless you impel yourself once.