android - rxbus连续发送消息报错

【字号: 日期:2022-11-20浏览:22作者:雯心

问题描述

我用rxbus连续发送大量消息报错

08-26 09:37:13.458 10637-10637/com.dituwuyou E/AndroidRuntime: FATAL EXCEPTION: main

Process: com.dituwuyou, PID: 10637 java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling. at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:112) at android.os.Handler.handleCallback(Handler.java:739) at android.os.Handler.dispatchMessage(Handler.java:95) at android.os.Looper.loop(Looper.java:148) at android.app.ActivityThread.main(ActivityThread.java:5417) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)Caused by: rx.exceptions.OnErrorNotImplementedException: PublishSubject: could not emit value due to lack of requests at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386) at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383) at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44) at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:152) at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219) at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107) at android.os.Handler.handleCallback(Handler.java:739) at android.os.Handler.dispatchMessage(Handler.java:95) at android.os.Looper.loop(Looper.java:148) at android.app.ActivityThread.main(ActivityThread.java:5417) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)Caused by: rx.exceptions.MissingBackpressureException: PublishSubject: could not emit value due to lack of requests at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:308) at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220) at rx.subjects.PublishSubject.onNext(PublishSubject.java:73) at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92) at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67) at com.dituwuyou.widget.rxjava.RxBus.send(RxBus.java:45) at com.dituwuyou.joint.CoorSocketService.messageReceived(CoorSocketService.java:51) at com.dituwuyou.fayeclient.FayeClient.parseFayeMessage(FayeClient.java:535) at com.dituwuyou.fayeclient.FayeClient.onMessage(FayeClient.java:390) at com.dituwuyou.fayeclient.HybiParser.emitFrame(HybiParser.java:304) at com.dituwuyou.fayeclient.HybiParser.start(HybiParser.java:130) at com.dituwuyou.fayeclient.WebSocketClient$1.run(WebSocketClient.java:119) at java.lang.Thread.run(Thread.java:818)

我的rxbus是这样定义的

import rx.Observable;import rx.subjects.PublishSubject;import rx.subjects.SerializedSubject;import rx.subjects.Subject;/** * Created by xg on 2016/6/24. * 消息传递(替换handler,eventbus) */public class RxBus { private static volatile RxBus mInstance; private final Subject bus; public RxBus() {bus = new SerializedSubject<>(PublishSubject.create()); } /** * 单例模式RxBus * * @return */ public static RxBus getRxBusSingleton() {RxBus rxBus2 = mInstance;if (mInstance == null) { synchronized (RxBus.class) {rxBus2 = mInstance;if (mInstance == null) { rxBus2 = new RxBus(); mInstance = rxBus2;} }}return rxBus2; } /** * 发送消息 * * @param object */ public void send(Object object) {bus.onNext(object); } /** * 接收消息 * * @return */ public Observable toObserverable() {return bus; }}

这是第45行 bus.onNext(object);应该是出现背压了

问题解答

回答1:

1.看不懂你为什么要加个rxBus22.要处理onError的情况3.先注册 后send

回答2:

用的什么版本的RxJava. 在1.1.6版本中已经没有rx.subjects.PublishSubject$PublishSubjectProducer这个类了

相关文章: