zl程序教程

您现在的位置是:首页 >  移动开发

当前栏目

Android 写一个属于自己的Rxjava(二)详解手机开发

Androidrxjava手机开发 详解 一个 自己 属于
2023-06-13 09:20:15 时间
目录

Android 写一个属于自己的Rxjava(一)

Android 写一个属于自己的Rxjava(二)

前言

上一篇实现了Rxjava基本的Observable和map操作符的实现,接下来需要实现Rxjava最重要的线程切换和复杂的操作符:

subscribeOn() observeOn() from() zip() flatmap()

先附上github源码

subscribeOn()

subscribeOn()作用在上游的发射,先定义一个CustomScheduler,提供执行任务的接口。

public class CustomScheduler {

 private final Executor executor; 

 public CustomScheduler(Executor executor) {

 this.executor = executor; 

 public CustomWorker createWorker() {

 return new CustomWorker(executor); 

 public static class CustomWorker{

 private final Executor executor; 

 public CustomWorker(Executor executor) {

 this.executor = executor; 

 public void schedule(Runnable runnable) {

 executor.execute(runnable); 

我们可以定义多种多样的CustomScheduler,指定执行在什么线程或者线程池。我们还可以造一个执行在主线程的Scheduler,就可以达到AndroidSchedulers.mainThread()一样的效果。

继续在CustomObservable中提供subscribeOn()的方法:

 // CustomObservable 

 public CustomObservable T subscribeOn(CustomScheduler scheduler) {

 return new CustomObservableSubscribeOn(this, scheduler); 

跟上篇文章一样,生成了CustomObservableSubscribeOn来封装上游和下游。CustomObservableSubscribeOn的实现也很简单,只是将上游的执行扔进CustomScheduler线程池里面执行,下游Observer不需要做什么动作。

 

class CustomObservableSubscribeOn T extends CustomObservable T {

 private CustomObservableSource T source; 

 private CustomScheduler scheduler; 

 public CustomObservableSubscribeOn(CustomObservableSource T source, CustomScheduler scheduler) {

 this.source = source; 

 this.scheduler = scheduler; 

 @Override 

 protected void subscribeActual(final CustomObserver observer) {

 final CustomSubscribeOnObserver subscribeOnObserver = new CustomSubscribeOnObserver(observer); 

 CustomScheduler.CustomWorker worker = scheduler.createWorker(); 

 worker.schedule(new Runnable() {

 @Override 

 public void run() {

 // 将任务执行扔进CustomScheduler 

 source.subscribe(subscribeOnObserver); 

 }); 

 private static final class CustomSubscribeOnObserver T implements CustomObserver T {

 final CustomObserver ? super T actual; 

 CustomSubscribeOnObserver(CustomObserver ? super T actual) {

 this.actual = actual; 

 @Override 

 public void onStart() {

 actual.onStart(); 

 @Override 

 public void onNext(T t) {

 actual.onNext(t); 

 @Override 

 public void onError(Throwable error) {

 actual.onError(error); 

 @Override 

 public void onComplete() {

 actual.onComplete(); 

ObserveOn()

其实本质跟subscribeOn是一样的,区别在于ObserveOn()作用在下游的observer中。

提供ObserverOn()方法

 // CustomObservable 

 public CustomObservable T observeOn(CustomScheduler scheduler) {

 return new CustomObservableObserveOn(this, scheduler); 

继续新建CustomObservableObserveOn类,只需要将回调事件onNext等扔进CustomScheduler的线程池就完成任务了。

class CustomObservableObserveOn T extends CustomObservable T {

 private CustomObservableSource T source; 

 private CustomScheduler scheduler; 

 public CustomObservableObserveOn(CustomObservableSource source, CustomScheduler scheduler) {

 this.source = source; 

 this.scheduler = scheduler; 

 @Override 

 protected void subscribeActual(CustomObserver observer) {

 CustomScheduler.CustomWorker worker = scheduler.createWorker(); 

 CustomObserverObserveOn observerObserveOn = new CustomObserverObserveOn T (observer, worker); 

 source.subscribe(observerObserveOn); 

 private static class CustomObserverObserveOn T implements CustomObserver T {

 private CustomObserver T observer; 

 private CustomScheduler.CustomWorker worker; 

 public CustomObserverObserveOn(CustomObserver T observer, CustomScheduler.CustomWorker worker) {

 this.observer = observer; 

 this.worker = worker; 

 @Override 

 public void onStart() {

 this.worker.schedule(new Runnable() {

 @Override 

 public void run() {

 observer.onStart(); 

 }); 

 @Override 

 public void onNext(final T t) {

 this.worker.schedule(new Runnable() {

 @Override 

 public void run() {

 observer.onNext(t); 

 }); 

 @Override 

 public void onError(final Throwable e) {

 this.worker.schedule(new Runnable() {

 @Override 

 public void run() {

 observer.onError(e); 

 }); 

 @Override 

 public void onComplete() {

 this.worker.schedule(new Runnable() {

 @Override 

 public void run() {

 observer.onComplete(); 

 }); 

from()

rxjava用fromIterable 操作符可以逐次发射list的中的数据。

怎么简单实现一个封装多个值的Observable。其实也不难,就是执行subscribeOn()后,多次调用onNext()发射数据。

 // CustomObservable 

 public static T CustomObservable T from(Iterable T values) {

 return new CustomObservableIterable (values); 

继续造CustomObservableIterable

class CustomObservableIterable T extends CustomObservable {

 private Iterable T valueIter; 

 public CustomObservableIterable(Iterable T valueIter) {

 this.valueIter = valueIter; 

 @Override 

 protected void subscribeActual(CustomObserver observer) {

 CustomIterableObserver T iterableObserver = new CustomIterableObserver (valueIter, observer); 

 CustomInterableSource source = new CustomInterableSource(); 

 source.subscribe(iterableObserver); 

 private class CustomInterableSource implements CustomObservableSource {

 @Override 

 public void subscribe(CustomObserver observer) {

 observer.onStart(); 

 observer.onNext(null); 

 observer.onComplete(); 

 private static class CustomIterableObserver T implements CustomObserver T {

 private Iterable T valueIter; 

 private CustomObserver T observer; 

 CustomIterableObserver(Iterable T valueIter, CustomObserver T observer) {

 this.valueIter = valueIter; 

 this.observer = observer; 

 @Override 

 public void onStart() {

 this.observer.onStart(); 

 @Override 

 public void onNext(T t) {

 for (T value : valueIter) {

 this.observer.onNext(value); 

 @Override 

 public void onError(Throwable e) {

 this.observer.onError(e); 

 @Override 

 public void onComplete() {

 this.observer.onComplete(); 

zip

网上把zip说得好复杂,每次我都没看明白,其实zip用起来很简单,就是将多个上游的发射请求执行结果混合在一起,统一发射给同一个下游observer。但是要注意的是,多个上游的是一一对应混合的。

任务A的执行的结果是[1, 2, 3]
任务B的执行的结果是[1, 2]
混合规则是加法,那么最后的结果是什么?

结果是:[2, 4]
因为B没有结果跟A的3对应,所以抛弃了A的3。

zip的实现比较复杂,同样先提供一个对外的静态方法

// CustomObservable 

public static T, U, R CustomObservable R zip(final CustomObservableSource T o1, 

 final CustomObservableSource U o2, 

 CustomBiFunction T, U, R mapper) {

 List CustomObservableSource ? list = Arrays.asList(o1, o2); 

 CustomFunction Object[], R arrayFunc = new CustomFunctions.Array2Func(mapper); 

 return new CustomObservableZip(list, arrayFunc); 

public interface CustomBiFunction T, U, R {

 R apply(T t, U u); 

我们将CustomBitFunction转换成CustomFunction Object[], R ,更有通配性,简单理解就是表示多个CustomObservableSource转换成R结果。至于如何转换,直接看上面的github源码。

public class CustomObservableZip T, U, R extends CustomObservable T {

 List CustomObservableSource T sources; 

 CustomFunction Object[], R mapper; 

 public CustomObservableZip(List CustomObservableSource T sources, CustomFunction Object[], R mapper) {

 this.sources = sources; 

 this.mapper = mapper; 

 @Override 

 protected void subscribeActual(CustomObserver observer) {

 ZipCoordinator zipCoordinator = new ZipCoordinator(observer, sources, mapper); 

 zipCoordinator.subscribe(); 

 static final class ZipCoordinator T, R {

 CustomObserver R actual; 

 List CustomObservableSource T sources; 

 List ZipObserver T, R observers; 

 CustomFunction Object[], R mapper; 

 int size; 

 boolean isFinish; 

 ZipCoordinator(CustomObserver R observer, 

 List CustomObservableSource T sources, 

 CustomFunction Object[], R mapper) {

 this.actual = observer; 

 this.sources = sources; 

 this.mapper = mapper; 

 this.size = sources.size(); 

 this.observers = new ArrayList (size); 

 this.isFinish = false; 

 public void subscribe() {

 actual.onStart(); 

 for (int i = 0; i size; i++) {

 ZipObserver observer = new ZipObserver T, R (this); 

 observers.add(observer); 

 for (int i = 0; i size; i++) {

 sources.get(i).subscribe(observers.get(i)); 

 void drain() {

 if (isFinish) {

 return; 

 boolean canMerge = true; 

 boolean isDone = true; 

 for (ZipObserver T, R observer: observers) {

 if (!observer.isDone) {

 isDone = false; 

 if (observer.queue.isEmpty()) {

 canMerge = false; 

 if (canMerge) {

 List T mergeList = new ArrayList (size); 

 for (ZipObserver T, R observer: observers) {

 T t = observer.queue.poll(); 

 mergeList.add(t); 

 actual.onNext(mapper.apply(mergeList.toArray())); 

 if (isDone) {

 actual.onComplete(); 

 static class ZipObserver T, R implements CustomObserver T {

 boolean isDone; 

 ZipCoordinator T, R parent; 

 Queue T queue; 

 Throwable error; 

 public ZipObserver(ZipCoordinator parent) {

 this.parent = parent; 

 this.queue = new LinkedList (); 

 this.isDone = false; 

 @Override 

 public void onStart() {


事实上Rxjava的zip实现比上面复杂多一些。
简单说下我的实现方式,就是为每一个CustomObservableSource提供一个ZipObserver,内部存储着自己的计算结果,每次执行完任务调用onNext的时候,就去看下是不是所有的zipObserver的队列都是有计算结果的,如果是,就将结果混合之后发射出去。

flatmap
 public R CustomObservable R flatMap(CustomFunction T, CustomObservableSource R function) {

 return new CustomObservableFlatMap(this, function); 

其实flatmap跟map的区别在于,前者是将值转换成一个Observable,而后者将值转换成另外种类型的值。

public class CustomObservableFlatMap T, R extends CustomObservable {

 private CustomObservableSource T source; 

 private CustomFunction T, CustomObservableSource R mapper; 

 public CustomObservableFlatMap(CustomObservableSource T source, CustomFunction T, CustomObservableSource R mapper) {

 this.source = source; 

 this.mapper = mapper; 

 @Override 

 protected void subscribeActual(CustomObserver observer) {

 CustomFlatMapObserver T, R flatMapObserver = new CustomFlatMapObserver(observer, mapper); 

 source.subscribe(flatMapObserver); 

 private static class CustomFlatMapObserver T, R implements CustomObserver T {

 private CustomObserver R observer; 

 private CustomFunction T, CustomObservableSource R mapper; 

 public CustomFlatMapObserver(CustomObserver R observer, CustomFunction T, CustomObservableSource R mapper) {

 this.observer = observer; 

 this.mapper = mapper; 

 @Override 

 public void onStart() {

 observer.onStart(); 

 @Override 

 public void onNext(T t) {

 CustomObservableSource R source = mapper.apply(t); 

 InnerObserver R innerObserver = new InnerObserver (observer); 

 source.subscribe(innerObserver); 

 @Override 

 public void onError(Throwable e) {

 observer.onError(e); 

 @Override 

 public void onComplete() {

 observer.onComplete(); 

 private static class InnerObserver R implements CustomObserver R {

 private CustomObserver R observer; 

 InnerObserver(CustomObserver R observer) {

 this.observer = observer; 

 @Override 

 public void onStart() {