zl程序教程

您现在的位置是:首页 >  移动开发

当前栏目

Android 写一个属于自己的Rxjava(一)详解手机开发

Androidrxjava手机开发 详解 一个 自己 属于
2023-06-13 09:20:15 时间
目录

Android 写一个属于自己的Rxjava(一)

Android 写一个属于自己的Rxjava(二)

背景

之所以想要自己动手写一个简单的Rxjava,并不是想证明自己多厉害,而是借助动手来理解Rxjava的思想和源码,实话实说,Rxjava是我看过的源码里面名字取得最迷糊的。看了三四天,硬是把自己搞迷糊了。

相反,我尝试自己做一个简单的轮子,发现其实Rxjava的整体思想和实现并不难,但层层的封装真是让人抓不住头脑。不过,只要抓住规律,其实理解起来还是挺简单的。

重点在于分清楚:上游发射事件,下游接收事件

哈哈,我并不是在说废话,只要分清楚哪些操作符是作用在上游,哪些作用在下游,在此基础上对上游或者下游封装多一层就成了Rxjava,这样说也也是白说,让我们动手造个简单的。

先附上github源码

CustomObservable

为了对比rxjava,我把所有的类名前面都加了Custom表示自定义的意思

定义两个下游的接收事件的基类(观察者):

CustomEmitter和CustomObserver ,其实二者是基本一样的接口

负责接收onStart、onNext、onComplete、onError事件

其中CustomObserver是给暴露给外界使用的,而是CustomEmitter封装在内部使用

CustomEmitter
// 下游接收事件,并负责暴露给外部的发射事件者 

public interface CustomEmitter T {

 void onNext(T value); 

 void onError(Throwable e); 

 void onComplete(); 

CustomObserver
// 下游接收事件 

public interface CustomObserver T {

 void onStart(); 

 void onNext(T t); 

 void onError(Throwable e); 

 void onComplete(); 

定义一个上游的执行回调事件的基类(被观察者):

CustomObservableSource 和CustomObservableOnSubscribe

负责执行subscribe(耗时)方法,并发射(调用)onStart、onNext、onComplete、onError事件

其中CustomObservableOnSubscribe是给暴露给外界使用的,而CustomObservableSource是封装在内部使用

个人不喜欢观察者和被观察者的说法,总是分不清;

所以以下用上游执行者和下游接收者

CustomObservableOnSubscribe
public interface CustomObservableOnSubscribe T {

 void subscribe(CustomEmitter T emitter); 

CustomObservableSource
public interface CustomObservableSource T {

 void subscribe(CustomObserver ? super T observer); 

CustomObservable

CustomObservable 就是一个基类,负责创建各个对应的子类,这里的create()创建了CustomObservableCreate

public abstract class CustomObservable T implements CustomObservableSource {


这里之所以CustomObservableCreate继承CustomObservable而不是CustomObservableSource,是为了保证对外抛出去的是CustomObservable


CustomObservableCreate就是上游的执行事件者,负责封装一层CustomObservableOnSubscribe


// 上游封装了subscriber 

public class CustomObservableCreate T extends CustomObservable {

 private CustomObservableOnSubscribe T subscriber; 

 public CustomObservableCreate(CustomObservableOnSubscribe T subscriber) {

 this.subscriber = subscriber; 

 @Override 

 protected void subscribeActual(CustomObserver observer) {

 CustomCreateEmitter emitter = new CustomCreateEmitter T (observer); 

 observer.onStart(); 

 // 真正执行耗时方法 

 subscriber.subscribe(emitter); 

 // 下游封装了CustomObserver 

 private static class CustomCreateEmitter T implements CustomEmitter T {

 private CustomObserver ? super T observer; 

 CustomCreateEmitter(CustomObserver ? super T observer) {

 this.observer = observer; 

 @Override 

 public void onNext(T o) {

 observer.onNext(o); 

 @Override 

 public void onError(Throwable e) {

 observer.onError(e); 

 @Override 

 public void onComplete() {

 observer.onComplete(); 

测试一下
public void testCreate() {

 CustomObservable.create(new CustomObservableOnSubscribe String () {

 @Override 

 public void subscribe(CustomEmitter String emitter) {

 emitter.onNext("test create"); 

 emitter.onComplete(); 

 }).subscribe(ExampleUnitTest. String getObserver()); 

public static T CustomObserver getObserver() {

 CustomObserver T observer = new CustomObserver T () {

 @Override 

 public void onStart() {

 System.out.println("==== start " + Thread.currentThread() + " ===="); 

 @Override 

 public void onNext(T t) {

 System.out.println(Thread.currentThread() + " next: " + t); 

 @Override 

 public void onError(Throwable e) {

 System.out.println(Thread.currentThread() + " error: " + e); 

 @Override 

 public void onComplete() {

 System.out.println("==== " + Thread.currentThread() + " complete ==== /n"); 

 return observer; 

测试结果:

==== start Thread[main,5,main] ====
Thread[main,5,main] next: test create
==== Thread[main,5,main] complete ====

操作符 map

rxjava的强大有一方面就在于它丰富的操作符,其中常用之一的就是map

map的作用是在下游的接收事件者(观察者),将返回的结果进行转换映射

定义一个CustomFunction负责数据转换

public interface CustomFunction T, R {

 R apply(T t); 

CustomObservable定义多一个map的静态方法

public R CustomObservable R map(CustomFunction T, R function) {

 return new CustomObservableMap(this, function); 

CustomObservableMap

public class CustomObservableMap R, T extends CustomObservable {

 private CustomObservableSource T source; 

 private CustomFunction T, R mapper; 

 public CustomObservableMap(CustomObservableSource T source, CustomFunction T, R mapper) {

 this.source = source; 

 this.mapper = mapper; 

 @Override 

 protected void subscribeActual(CustomObserver observer) {

 CustomMapObserver T, R mapObserver = new CustomMapObserver(observer, mapper); 

 source.subscribe(mapObserver); 

 private static class CustomMapObserver T, R implements CustomObserver T {

 private CustomObserver R observer; 

 private CustomFunction T, R function; 

 public CustomMapObserver(CustomObserver R observer, CustomFunction T, R function) {

 this.observer = observer; 

 this.function = function; 

 @Override 

 public void onStart() {

 observer.onStart(); 

 @Override 

 public void onNext(T result) {

 // 做结果数据转换映射 

 observer.onNext(function.apply(result)); 

 @Override 

 public void onError(Throwable e) {

 observer.onError(e); 

 @Override 

 public void onComplete() {

 observer.onComplete(); 

测试一下
public void testMap() {

 CustomObservable.create(new CustomObservableOnSubscribe String () {

 @Override 

 public void subscribe(CustomEmitter String emitter) {

 emitter.onNext("test create"); 

 emitter.onComplete(); 

 }).map(new CustomFunction String, String () {

 @Override 

 public String apply(String s) {

 return "test map " + s; 

 }).subscribe(ExampleUnitTest. String getObserver()); 

测试结果

==== start Thread[main,5,main] ====
Thread[main,5,main] next: test map test create
==== Thread[main,5,main] complete ====

6253.html

app程序应用开发手机开发无线开发移动端开发