博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RxJava2最强源码解析系列1:Observable
阅读量:7075 次
发布时间:2019-06-28

本文共 10555 字,大约阅读时间需要 35 分钟。

前言

本系列面向已经熟练使用RxJava2的朋友,如果你是RxJava初学者,那么可以移步 这篇经典之作。

开始

RxJava2对于RxJava2最大的区别就是根据是否支持背压(backpressure)将原来的Observable分为Observable(不支持背压)和Flowable(支持背压),二者提供的操作符方法是一致的,那么我们就从Observable开始入手解析。

先看下面这段代码:

Observable.create(new ObservableOnSubscribe
() { @Override public void subscribe(ObservableEmitter
e) throws Exception { e.onNext(1); e.onNext(5); e.onNext(10); e.onComplete(); } }).filter(new Predicate
() { @Override public boolean test(Integer integer) throws Exception { return integer < 10; } }).map(new Function
() { @Override public String apply(Integer integer) throws Exception { return "num: " + integer; } }).subscribe(new Observer
() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { System.out.println(s); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onComplete() { System.out.println("onComplete"); } });复制代码

这段代码的处理很简单,涉及到了3个操作符,分别是create、filter、map,它们的作用分别是:

create:创建Observable,并定义数据发射逻辑
filter:过滤数据
map:数据转换
所以最终运行的结果是:

num: 1num: 5复制代码

源码分析

现在我们开始对上述例子进行分析,首先我们将链式写法分解:

Observable
observable1 = Observable.create(new ObservableOnSubscribe
() { @Override public void subscribe(ObservableEmitter
e) throws Exception { e.onNext(1); e.onNext(5); e.onNext(10); e.onComplete(); } }); Observable
observable2 = observable1.filter(new Predicate
() { @Override public boolean test(Integer integer) throws Exception { return integer < 10; } }); Observable
observable3 = observable2 .map(new Function
() { @Override public String apply(Integer integer) throws Exception { return "num: " + integer; } }); observable3.subscribe(new Observer
() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { System.out.println(s); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onComplete() { System.out.println("onComplete"); } });复制代码

从上面可以看出每一个操作符都会返回一个新的Observable,最终与Observer订阅的是最后返回的Observable3,现在我们先看看observable3.subscribe(new Observer() {}):

Observable.java  public final void subscribe(Observer
observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }复制代码

在RxJava的源码中会有很多RxJavaPlugins方法,这些是RxJava提供的一些钩子函数,它们不会影响代码的逻辑,它们的作用呢?个人认为是方便我们开发者进行一些Hook操作。

那么这个方法里的主要代码就只有这一行:

subscribeActual(observer);复制代码

而这个subscribeActual是一个抽象方法,需要每个具体的Observable去实现。从这里我们可以明白,所谓的订阅就是将Observer传入到Observable的subscribeActual(observer)方法中,具体的订阅逻辑就看具体实现。

那我们现在就看看observable3的具体实现,它是map这个操作符返回的,我们看看map方法:

public final 
Observable
map(Function
mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap
(this, mapper)); }复制代码

除去RxJavaPlugins方法,这里创建了一个ObservableMap,所以observable3的类型就是ObservableMap,同时在创建的时候它引用了之前的那个Observable,我们看看其实现:

public final class ObservableMap
extends AbstractObservableWithUpstream
{ final Function
function; public ObservableMap(ObservableSource
source, Function
function) { super(source); this.function = function; } @Override public void subscribeActual(Observer
t) { source.subscribe(new MapObserver
(t, function)); }}复制代码

代码很简单,我们看到它的subscribeActual实现,它用前一个Observable去subscribe一个新的MapObserver,这里我们先不看MapObserver的实现,只需要知道它拥有前一个Observer的引用。我们前面知道subscribe方法会调用该Observable的subscribeActual方法,所以这里会调用它前一个Observable的subscribeActual方法,observable2是filter这个操作符返回的,那么我们看看代码:

public final Observable
filter(Predicate
predicate) { ObjectHelper.requireNonNull(predicate, "predicate is null"); return RxJavaPlugins.onAssembly(new ObservableFilter
(this, predicate)); }复制代码
public final class ObservableFilter
extends AbstractObservableWithUpstream
{ final Predicate
predicate; public ObservableFilter(ObservableSource
source, Predicate
predicate) { super(source); this.predicate = predicate; } @Override public void subscribeActual(Observer
s) { source.subscribe(new FilterObserver
(s, predicate)); }复制代码

可以看到filter的结构和map的是一模一样的,只是它们对应的Observer不同而已。那我们溯源到底,observable2是observable1调用filter返回的,所以这里的source是observable1,而observable1是create操作符返回的,我们看看create方法:

public static 
Observable
create(ObservableOnSubscribe
source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate
(source)); }复制代码
public final class ObservableCreate
extends Observable
{ final ObservableOnSubscribe
source; public ObservableCreate(ObservableOnSubscribe
source) { this.source = source; } @Override protected void subscribeActual(Observer
observer) { CreateEmitter
parent = new CreateEmitter
(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }复制代码

从ObservableCreate的subscribeActual方法中看到,它和map和filter的实现很相似,都包装observer后生产一个新的Observer,然后订阅,但是这里是整个RxJava调用链的源头了,这里的source不再是observable而是ObservableOnSubscribe,它是我们之前传入的匿名类。这里首先还是包装了一些observer,之后调用observer的onSubscribe方法,最后调用source的subscribe方法,这里看看我们之前传入的ObservableOnSubscribe类:

Observable
observable1 = Observable.create(new ObservableOnSubscribe
() { @Override public void subscribe(ObservableEmitter
e) throws Exception { e.onNext(1); e.onNext(5); e.onNext(10); e.onComplete(); } });复制代码

这里我们调用了ObservableEmitter的onNext发射数据,在它的内部会调用其引用的observer的onNext方法,从我们之前分析知道,这个调用链是:ObservableEmitter->FilterObserver->MapObserver->传入的Observer

现在我们可以看看FilterObserver内部是怎么实现的了:

static final class FilterObserver
extends BasicFuseableObserver
{ final Predicate
filter; FilterObserver(Observer
actual, Predicate
filter) { super(actual); this.filter = filter; } @Override public void onNext(T t) { if (sourceMode == NONE) { boolean b; try { b = filter.test(t); } catch (Throwable e) { fail(e); return; } if (b) { actual.onNext(t); } } else { actual.onNext(null); } } @Override public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } @Nullable @Override public T poll() throws Exception { for (;;) { T v = qs.poll(); if (v == null || filter.test(v)) { return v; } } } }复制代码

从onNext方法可以知道,它根据传入的Predicate的call方法判断是否要过滤数据,只有根据条件返回true的,才往下调用onNext。再看看MapObserver的实现:

static final class MapObserver
extends BasicFuseableObserver
{ final Function
mapper; MapObserver(Observer
actual, Function
mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { actual.onNext(null); return; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } actual.onNext(v); } @Override public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } @Nullable @Override public U poll() throws Exception { T t = qs.poll(); return t != null ? ObjectHelper.
requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null; } }复制代码

map操作符的逻辑是对象类型转换,所以这里面调用Function<? super T, ? extends U> mapper的apply方法来转换数据,最终调用它包装的Observer的onNext方法,到这里一次Observer链就走完了。

总结

到这里RxJava到基本调用流程就说完了,第一次接触RxJava源码的朋友可能会觉得有点绕,这里举一个形象的例子帮大家理解:

现在有三个工厂ABC:
A是生成原材料的工厂,相当于上面的Observable1(create);
B是负责加工的工厂,相当于上面的Observable2(filter);
C是负责加工的工厂,相当于上面的Observable3(map);
还有一个门店S:
S是销售的门店,相当于上面传入的Observer;

现在S需要制造商品,它找到了工厂C,告诉它你给我生产一个这样的东西(相当于上面订阅的Observer);

接着C发现自己没有这样的东西,但是它自己有加工厂,需要材料才能加工出来,所以它找到了B;
接着B也发现自己没有这样的东西,同样自己有加工厂,所以它找到了A;
最终由于A是原材料工厂,所以它将材料给了B;
B拿到材料后自己加工给了C,C拿到后自己再加工,最后给了门店S;

想了好久觉得这个例子是比较形象的,Observable的引用关系就像上面的工厂之间的关系,而获得原材料后加工就像层层包装的Observer,每个Observer里面做的事情就相当于加工;

各位同学细细品味

转载地址:http://plkml.baihongyu.com/

你可能感兴趣的文章
Exchange2010 dag 的部署
查看>>
Linux/UNIX的scp命令用法详解
查看>>
Eclipse(MyEclipse)插件Jigloo的下载与安装
查看>>
软件设计的思想与哲学
查看>>
非常实用的linux系统命令
查看>>
NFS在Centos 6.3下的安装
查看>>
git pull 和本地文件冲突解决
查看>>
iOS音频AAC视频H264编码 推流最佳方案
查看>>
python基础教程(第2版)第五章读后总结;
查看>>
关于在eclipse中使用tomcat的笔记
查看>>
Android自定义控件实现简单的轮播图控件
查看>>
centos 6.4下的samba服务器的构建
查看>>
持续交付:价值主张
查看>>
二进制、八进制、十进制、十六进制之间转换
查看>>
sqlmap 本地安装
查看>>
[计算机术语]缺省
查看>>
JS --事件
查看>>
printStream 和printWriter区别
查看>>
Centos6.6搭建中文版本的Cacti监控
查看>>
将整数n转换为以b进制的数
查看>>