Android 写一个属于自己的Rxjava(二)详解手机开发
Android 写一个属于自己的Rxjava(一)
Android 写一个属于自己的Rxjava(二)
前言上一篇实现了Rxjava基本的Observable和map操作符的实现,接下来需要实现Rxjava最重要的线程切换和复杂的操作符:
subscribeOn() observeOn() from() zip() flatmap()先附上github源码
subscribeOn()subscribeOn()作用在上游的发射,先定义一个CustomScheduler,提供执行任务的接口。
public class CustomScheduler { private final Executor executor; public CustomScheduler(Executor executor) { this.executor = executor; public CustomWorker createWorker() { return new CustomWorker(executor); public static class CustomWorker{ private final Executor executor; public CustomWorker(Executor executor) { this.executor = executor; public void schedule(Runnable runnable) { executor.execute(runnable);
我们可以定义多种多样的CustomScheduler,指定执行在什么线程或者线程池。我们还可以造一个执行在主线程的Scheduler,就可以达到AndroidSchedulers.mainThread()一样的效果。
继续在CustomObservable中提供subscribeOn()的方法:
// CustomObservable public CustomObservable T subscribeOn(CustomScheduler scheduler) { return new CustomObservableSubscribeOn(this, scheduler);
跟上篇文章一样,生成了CustomObservableSubscribeOn来封装上游和下游。CustomObservableSubscribeOn的实现也很简单,只是将上游的执行扔进CustomScheduler线程池里面执行,下游Observer不需要做什么动作。
class CustomObservableSubscribeOn T extends CustomObservable T { private CustomObservableSource T source; private CustomScheduler scheduler; public CustomObservableSubscribeOn(CustomObservableSource T source, CustomScheduler scheduler) { this.source = source; this.scheduler = scheduler; @Override protected void subscribeActual(final CustomObserver observer) { final CustomSubscribeOnObserver subscribeOnObserver = new CustomSubscribeOnObserver(observer); CustomScheduler.CustomWorker worker = scheduler.createWorker(); worker.schedule(new Runnable() { @Override public void run() { // 将任务执行扔进CustomScheduler source.subscribe(subscribeOnObserver); }); private static final class CustomSubscribeOnObserver T implements CustomObserver T { final CustomObserver ? super T actual; CustomSubscribeOnObserver(CustomObserver ? super T actual) { this.actual = actual; @Override public void onStart() { actual.onStart(); @Override public void onNext(T t) { actual.onNext(t); @Override public void onError(Throwable error) { actual.onError(error); @Override public void onComplete() { actual.onComplete();ObserveOn()
其实本质跟subscribeOn是一样的,区别在于ObserveOn()作用在下游的observer中。
提供ObserverOn()方法
// CustomObservable public CustomObservable T observeOn(CustomScheduler scheduler) { return new CustomObservableObserveOn(this, scheduler);
继续新建CustomObservableObserveOn类,只需要将回调事件onNext等扔进CustomScheduler的线程池就完成任务了。
class CustomObservableObserveOn T extends CustomObservable T { private CustomObservableSource T source; private CustomScheduler scheduler; public CustomObservableObserveOn(CustomObservableSource source, CustomScheduler scheduler) { this.source = source; this.scheduler = scheduler; @Override protected void subscribeActual(CustomObserver observer) { CustomScheduler.CustomWorker worker = scheduler.createWorker(); CustomObserverObserveOn observerObserveOn = new CustomObserverObserveOn T (observer, worker); source.subscribe(observerObserveOn); private static class CustomObserverObserveOn T implements CustomObserver T { private CustomObserver T observer; private CustomScheduler.CustomWorker worker; public CustomObserverObserveOn(CustomObserver T observer, CustomScheduler.CustomWorker worker) { this.observer = observer; this.worker = worker; @Override public void onStart() { this.worker.schedule(new Runnable() { @Override public void run() { observer.onStart(); }); @Override public void onNext(final T t) { this.worker.schedule(new Runnable() { @Override public void run() { observer.onNext(t); }); @Override public void onError(final Throwable e) { this.worker.schedule(new Runnable() { @Override public void run() { observer.onError(e); }); @Override public void onComplete() { this.worker.schedule(new Runnable() { @Override public void run() { observer.onComplete(); });from()
rxjava用fromIterable 操作符可以逐次发射list的中的数据。
怎么简单实现一个封装多个值的Observable。其实也不难,就是执行subscribeOn()后,多次调用onNext()发射数据。
// CustomObservable public static T CustomObservable T from(Iterable T values) { return new CustomObservableIterable (values);
继续造CustomObservableIterable
class CustomObservableIterable T extends CustomObservable { private Iterable T valueIter; public CustomObservableIterable(Iterable T valueIter) { this.valueIter = valueIter; @Override protected void subscribeActual(CustomObserver observer) { CustomIterableObserver T iterableObserver = new CustomIterableObserver (valueIter, observer); CustomInterableSource source = new CustomInterableSource(); source.subscribe(iterableObserver); private class CustomInterableSource implements CustomObservableSource { @Override public void subscribe(CustomObserver observer) { observer.onStart(); observer.onNext(null); observer.onComplete(); private static class CustomIterableObserver T implements CustomObserver T { private Iterable T valueIter; private CustomObserver T observer; CustomIterableObserver(Iterable T valueIter, CustomObserver T observer) { this.valueIter = valueIter; this.observer = observer; @Override public void onStart() { this.observer.onStart(); @Override public void onNext(T t) { for (T value : valueIter) { this.observer.onNext(value); @Override public void onError(Throwable e) { this.observer.onError(e); @Override public void onComplete() { this.observer.onComplete();zip
网上把zip说得好复杂,每次我都没看明白,其实zip用起来很简单,就是将多个上游的发射请求执行结果混合在一起,统一发射给同一个下游observer。但是要注意的是,多个上游的是一一对应混合的。
任务A的执行的结果是[1, 2, 3]
任务B的执行的结果是[1, 2]
混合规则是加法,那么最后的结果是什么?
结果是:[2, 4]
因为B没有结果跟A的3对应,所以抛弃了A的3。
zip的实现比较复杂,同样先提供一个对外的静态方法
// CustomObservable public static T, U, R CustomObservable R zip(final CustomObservableSource T o1, final CustomObservableSource U o2, CustomBiFunction T, U, R mapper) { List CustomObservableSource ? list = Arrays.asList(o1, o2); CustomFunction Object[], R arrayFunc = new CustomFunctions.Array2Func(mapper); return new CustomObservableZip(list, arrayFunc); public interface CustomBiFunction T, U, R { R apply(T t, U u);
我们将CustomBitFunction转换成CustomFunction Object[], R ,更有通配性,简单理解就是表示多个CustomObservableSource转换成R结果。至于如何转换,直接看上面的github源码。
public class CustomObservableZip T, U, R extends CustomObservable T { List CustomObservableSource T sources; CustomFunction Object[], R mapper; public CustomObservableZip(List CustomObservableSource T sources, CustomFunction Object[], R mapper) { this.sources = sources; this.mapper = mapper; @Override protected void subscribeActual(CustomObserver observer) { ZipCoordinator zipCoordinator = new ZipCoordinator(observer, sources, mapper); zipCoordinator.subscribe(); static final class ZipCoordinator T, R { CustomObserver R actual; List CustomObservableSource T sources; List ZipObserver T, R observers; CustomFunction Object[], R mapper; int size; boolean isFinish; ZipCoordinator(CustomObserver R observer, List CustomObservableSource T sources, CustomFunction Object[], R mapper) { this.actual = observer; this.sources = sources; this.mapper = mapper; this.size = sources.size(); this.observers = new ArrayList (size); this.isFinish = false; public void subscribe() { actual.onStart(); for (int i = 0; i size; i++) { ZipObserver observer = new ZipObserver T, R (this); observers.add(observer); for (int i = 0; i size; i++) { sources.get(i).subscribe(observers.get(i)); void drain() { if (isFinish) { return; boolean canMerge = true; boolean isDone = true; for (ZipObserver T, R observer: observers) { if (!observer.isDone) { isDone = false; if (observer.queue.isEmpty()) { canMerge = false; if (canMerge) { List T mergeList = new ArrayList (size); for (ZipObserver T, R observer: observers) { T t = observer.queue.poll(); mergeList.add(t); actual.onNext(mapper.apply(mergeList.toArray())); if (isDone) { actual.onComplete(); static class ZipObserver T, R implements CustomObserver T { boolean isDone; ZipCoordinator T, R parent; Queue T queue; Throwable error; public ZipObserver(ZipCoordinator parent) { this.parent = parent; this.queue = new LinkedList (); this.isDone = false; @Override public void onStart() {
事实上Rxjava的zip实现比上面复杂多一些。
简单说下我的实现方式,就是为每一个CustomObservableSource提供一个ZipObserver,内部存储着自己的计算结果,每次执行完任务调用onNext的时候,就去看下是不是所有的zipObserver的队列都是有计算结果的,如果是,就将结果混合之后发射出去。
public R CustomObservable R flatMap(CustomFunction T, CustomObservableSource R function) { return new CustomObservableFlatMap(this, function);
其实flatmap跟map的区别在于,前者是将值转换成一个Observable,而后者将值转换成另外种类型的值。
public class CustomObservableFlatMap T, R extends CustomObservable { private CustomObservableSource T source; private CustomFunction T, CustomObservableSource R mapper; public CustomObservableFlatMap(CustomObservableSource T source, CustomFunction T, CustomObservableSource R mapper) { this.source = source; this.mapper = mapper; @Override protected void subscribeActual(CustomObserver observer) { CustomFlatMapObserver T, R flatMapObserver = new CustomFlatMapObserver(observer, mapper); source.subscribe(flatMapObserver); private static class CustomFlatMapObserver T, R implements CustomObserver T { private CustomObserver R observer; private CustomFunction T, CustomObservableSource R mapper; public CustomFlatMapObserver(CustomObserver R observer, CustomFunction T, CustomObservableSource R mapper) { this.observer = observer; this.mapper = mapper; @Override public void onStart() { observer.onStart(); @Override public void onNext(T t) { CustomObservableSource R source = mapper.apply(t); InnerObserver R innerObserver = new InnerObserver (observer); source.subscribe(innerObserver); @Override public void onError(Throwable e) { observer.onError(e); @Override public void onComplete() { observer.onComplete(); private static class InnerObserver R implements CustomObserver R { private CustomObserver R observer; InnerObserver(CustomObserver R observer) { this.observer = observer; @Override public void onStart() {
相关文章
- android bindservice方法,Android bindservice方法返回false
- android 验证码短信验证码,Android短信验证码倒计时验证的2种常用方式
- Android resource linking failed_android sdk location should not
- Android Services Library_android freeware
- Android studio更新后出现警告:Warning:The `android.dexOptions.incremental` property is deprecated and it has
- 【Android 逆向】Android 进程注入工具开发 ( 调试进程中寄存器的作用 | 通过 EIP 寄存器控制程序运行 | EIP 寄存器的存档与恢复 )
- 【错误记录】Android 应用配置第三方 so 动态库 ( /data/app/comxxx==/base.apk/lib/arm64-v8a]couldn‘t find “libx.so“ )
- 【ijkplayer】编译 Android 版本的 ijkplayer ⑥ ( 进入 ijkplayer-android/android 目录 | 执行 compile-ijk.sh 脚本完成编译 )
- 【Android Gradle 插件】自定义 Gradle 插件模块 ④ ( META-INF 中声明自定义插件的核心类 | 在应用中依赖本地 Maven 仓库中的自定义 Gradle 插件 )
- 【错误记录】Android Studio 编译报错 ( The project is using an incompatible version (AGP 7.4.2) of the Androi )
- Android本地图片选择并裁剪工具类详解手机开发
- Android开发中遇到的问题(四)——Android中WARNING: Application does not specify an API level requirement!的解决方法详解手机开发
- Android开发中遇到的问题(二)——新建android工程的时候eclipse没有生成MainActivity和layout布局详解手机开发
- [android] 手机卫士设备管理权限锁屏详解手机开发
- Android的DataBinding原理介绍详解手机开发
- Android恶意软件开发的新技术 | 360恶意软件专题报告
- android版本检测Android程序的版本检测与更新实现介绍
- android中可以通过两种方式调用接口发送短信
- 基于Android监听ContentProvider中数据变化的相关介绍
- Android控件(button)对齐方法实现详解
- Windows下获取Android源码方法的详解
- Android提高之模拟信号示波器的实现