前言
本系列面向已经熟练使用RxJava2的朋友,如果你是RxJava初学者,那么可以移步 这篇经典之作。
开始
RxJava2对于RxJava2最大的区别就是根据是否支持背压(backpressure)将原来的Observable分为Observable(不支持背压)和Flowable(支持背压),二者提供的操作符方法是一致的,那么我们就从Observable开始入手解析。
先看下面这段代码:
Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { e.onNext(1); e.onNext(5); e.onNext(10); e.onComplete(); } }).filter(new Predicate () { @Override public boolean test(Integer integer) throws Exception { return integer < 10; } }).map(new Function () { @Override public String apply(Integer integer) throws Exception { return "num: " + integer; } }).subscribe(new Observer () { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { System.out.println(s); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onComplete() { System.out.println("onComplete"); } });复制代码
这段代码的处理很简单,涉及到了3个操作符,分别是create、filter、map,它们的作用分别是:
create:创建Observable,并定义数据发射逻辑filter:过滤数据map:数据转换所以最终运行的结果是:num: 1num: 5复制代码
源码分析
现在我们开始对上述例子进行分析,首先我们将链式写法分解:
Observableobservable1 = Observable.create(new ObservableOnSubscribe () { @Override public void subscribe(ObservableEmitter e) throws Exception { e.onNext(1); e.onNext(5); e.onNext(10); e.onComplete(); } }); Observable observable2 = observable1.filter(new Predicate () { @Override public boolean test(Integer integer) throws Exception { return integer < 10; } }); Observable observable3 = observable2 .map(new Function () { @Override public String apply(Integer integer) throws Exception { return "num: " + integer; } }); observable3.subscribe(new Observer () { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { System.out.println(s); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onComplete() { System.out.println("onComplete"); } });复制代码
从上面可以看出每一个操作符都会返回一个新的Observable,最终与Observer订阅的是最后返回的Observable3,现在我们先看看observable3.subscribe(new Observer() {}):
Observable.java public final void subscribe(Observer observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }复制代码
在RxJava的源码中会有很多RxJavaPlugins方法,这些是RxJava提供的一些钩子函数,它们不会影响代码的逻辑,它们的作用呢?个人认为是方便我们开发者进行一些Hook操作。
那么这个方法里的主要代码就只有这一行:
subscribeActual(observer);复制代码
而这个subscribeActual是一个抽象方法,需要每个具体的Observable去实现。从这里我们可以明白,所谓的订阅就是将Observer传入到Observable的subscribeActual(observer)方法中,具体的订阅逻辑就看具体实现。
那我们现在就看看observable3的具体实现,它是map这个操作符返回的,我们看看map方法:
public finalObservable map(Function mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap (this, mapper)); }复制代码
除去RxJavaPlugins方法,这里创建了一个ObservableMap,所以observable3的类型就是ObservableMap,同时在创建的时候它引用了之前的那个Observable,我们看看其实现:
public final class ObservableMapextends AbstractObservableWithUpstream { final Function function; public ObservableMap(ObservableSource source, Function function) { super(source); this.function = function; } @Override public void subscribeActual(Observer t) { source.subscribe(new MapObserver (t, function)); }}复制代码
代码很简单,我们看到它的subscribeActual实现,它用前一个Observable去subscribe一个新的MapObserver,这里我们先不看MapObserver的实现,只需要知道它拥有前一个Observer的引用。我们前面知道subscribe方法会调用该Observable的subscribeActual方法,所以这里会调用它前一个Observable的subscribeActual方法,observable2是filter这个操作符返回的,那么我们看看代码:
public final Observablefilter(Predicate predicate) { ObjectHelper.requireNonNull(predicate, "predicate is null"); return RxJavaPlugins.onAssembly(new ObservableFilter (this, predicate)); }复制代码
public final class ObservableFilterextends AbstractObservableWithUpstream { final Predicate predicate; public ObservableFilter(ObservableSource source, Predicate predicate) { super(source); this.predicate = predicate; } @Override public void subscribeActual(Observer s) { source.subscribe(new FilterObserver (s, predicate)); }复制代码
可以看到filter的结构和map的是一模一样的,只是它们对应的Observer不同而已。那我们溯源到底,observable2是observable1调用filter返回的,所以这里的source是observable1,而observable1是create操作符返回的,我们看看create方法:
public staticObservable create(ObservableOnSubscribe source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate (source)); }复制代码
public final class ObservableCreateextends Observable { final ObservableOnSubscribe source; public ObservableCreate(ObservableOnSubscribe source) { this.source = source; } @Override protected void subscribeActual(Observer observer) { CreateEmitter parent = new CreateEmitter (observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }复制代码
从ObservableCreate的subscribeActual方法中看到,它和map和filter的实现很相似,都包装observer后生产一个新的Observer,然后订阅,但是这里是整个RxJava调用链的源头了,这里的source不再是observable而是ObservableOnSubscribe,它是我们之前传入的匿名类。这里首先还是包装了一些observer,之后调用observer的onSubscribe方法,最后调用source的subscribe方法,这里看看我们之前传入的ObservableOnSubscribe类:
Observableobservable1 = Observable.create(new ObservableOnSubscribe () { @Override public void subscribe(ObservableEmitter e) throws Exception { e.onNext(1); e.onNext(5); e.onNext(10); e.onComplete(); } });复制代码
这里我们调用了ObservableEmitter的onNext发射数据,在它的内部会调用其引用的observer的onNext方法,从我们之前分析知道,这个调用链是:ObservableEmitter->FilterObserver->MapObserver->传入的Observer
现在我们可以看看FilterObserver内部是怎么实现的了:
static final class FilterObserverextends BasicFuseableObserver { final Predicate filter; FilterObserver(Observer actual, Predicate filter) { super(actual); this.filter = filter; } @Override public void onNext(T t) { if (sourceMode == NONE) { boolean b; try { b = filter.test(t); } catch (Throwable e) { fail(e); return; } if (b) { actual.onNext(t); } } else { actual.onNext(null); } } @Override public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } @Nullable @Override public T poll() throws Exception { for (;;) { T v = qs.poll(); if (v == null || filter.test(v)) { return v; } } } }复制代码
从onNext方法可以知道,它根据传入的Predicate的call方法判断是否要过滤数据,只有根据条件返回true的,才往下调用onNext。再看看MapObserver的实现:
static final class MapObserverextends BasicFuseableObserver { final Function mapper; MapObserver(Observer actual, Function mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { actual.onNext(null); return; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } actual.onNext(v); } @Override public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } @Nullable @Override public U poll() throws Exception { T t = qs.poll(); return t != null ? ObjectHelper. requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null; } }复制代码
map操作符的逻辑是对象类型转换,所以这里面调用Function<? super T, ? extends U> mapper的apply方法来转换数据,最终调用它包装的Observer的onNext方法,到这里一次Observer链就走完了。
总结
到这里RxJava到基本调用流程就说完了,第一次接触RxJava源码的朋友可能会觉得有点绕,这里举一个形象的例子帮大家理解:
现在有三个工厂ABC:A是生成原材料的工厂,相当于上面的Observable1(create);B是负责加工的工厂,相当于上面的Observable2(filter);C是负责加工的工厂,相当于上面的Observable3(map);还有一个门店S:S是销售的门店,相当于上面传入的Observer;现在S需要制造商品,它找到了工厂C,告诉它你给我生产一个这样的东西(相当于上面订阅的Observer);
接着C发现自己没有这样的东西,但是它自己有加工厂,需要材料才能加工出来,所以它找到了B;接着B也发现自己没有这样的东西,同样自己有加工厂,所以它找到了A;最终由于A是原材料工厂,所以它将材料给了B;B拿到材料后自己加工给了C,C拿到后自己再加工,最后给了门店S;想了好久觉得这个例子是比较形象的,Observable的引用关系就像上面的工厂之间的关系,而获得原材料后加工就像层层包装的Observer,每个Observer里面做的事情就相当于加工;
各位同学细细品味