Rxjava线程切换原理终于在2023年有了答案。
虽然项目中有用到Rxjava但是从来没有花功夫研究过所以对这里一直是懵逼状态,面试的时候也是很恐惧这方面的提问,但是Rxjava线程切换原理一直是面试必问的问题,与其心存侥幸不如直面恐惧,到了2022年,终于对这个问题有了一个完整的理解,如有理解偏差,还请指出。//本次源码分析基于rxjava 3.0//使用最新的rxjava版本看这里 https://github.com/ReactiveX/R
文章目录
// 本次源码分析基于rxjava 3.0
// 使用最新的rxjava版本看这里 https://github.com/ReactiveX/RxAndroid
dependencies {
implementation 'io.reactivex.rxjava3:rxandroid:3.0.2'
implementation 'io.reactivex.rxjava3:rxjava:3.1.5'
}
先看看我们平时是如何使用的?
Observable.just("one").subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Observer<String> {
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: String) {
println("hhhhh")
}
override fun onError(e: Throwable) {
}
override fun onComplete() {
}
})
整体流程比较复杂,我们接下来通过多个UML图和代码一起来理解一下
类图
顺序图
- 构建流 从1到6 是我们开发者编写rxjava代码的过程
- 订阅流 从7到13 是rxjava内部注册订阅者的过程
- 回调流 从14到16 是rxjava内部把产生的数据回调给订阅者的过程
接下来让我们通过代码加深理解, 因为代码是比较枯燥的,建议在代码分析过程中我们时而回顾一下类图和顺序图可以帮我们更好地理解整体思路
源码分析
构建流
1 Observable.just(“one”) → ObservableJust.java
public static <@NonNull T> Observable<T> just(@NonNull T item) {
Objects.requireNonNull(item, "item is null");
// 这里返回的就是ObservableJust
return RxJavaPlugins.onAssembly(new ObservableJust<>(item));
}
2 ObservableJust.subscribeOn → ObservableSubscribeOn.java
ObservableJust.java中并没有subscribeOn方法,具体方法在其父类Observable中
ObservableJust.java
public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
observer.onSubscribe(sd);
sd.run();
}
@Override
public T get() {
return value;
}
}
Observable.java
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
// 这里的this就是 ObservableJust, scheduler 是 Schedulers.io()
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}
3 ObservableSubscribeOn.observeOn → ObservableObserveOn.java
ObservableSubscribeOn中并没有observeOn方法,具体方法在其父类Observable中
Observable.java
public final Observable<T> observeOn(@NonNull Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
/// this 是 ObservableSubscribeOn, scheduler 是AndroidSchedulers.mainThread()
return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
}
4 ObservableObserveOn.subscribe → void
ObservableObserveOn中并没有subscribe方法,具体方法在其父类Observable中
Observable.java
@Override
public final void subscribe(@NonNull Observer<? super T> observer) {
// ....
try {
// ....
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
// ....
}
}
整体代码可以简化如下
- new ObservableJust<>(item) 👇🏻👇🏻👇🏻
- new ObservableSubscribeOn<>(this, scheduler) 👇🏻👇🏻👇🏻
- new ObservableObserveOn<>(this, scheduler, delayError, bufferSize))
从上到下像套娃一样不断的把对象套进另一个对象里, 这里其实就是使用了装饰器模式 的设计模式
订阅流
1 ObservableObserveOn.subscribe → subscribeActual
ObservableObserveOn 并没有subscribe的具体实现所以去找其父类Observable,而Observable的subscribe实现是使用子类的subscribeActual实现
Observable.java
@Override
public final void subscribe(@NonNull Observer<? super T> observer) {
// ...
subscribeActual(observer);
// ...
}
ObservableObserveOn.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
// source 就是 ObservableSubscribeOn
// w 是 AndroidSchedulers.mainThread()
// observer 是我们自定义的 println("hhhhh") 的 Observer
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
}
}
2 ObservableSubscribeOn.subscribe → subscribeActual
和ObservableObserveOn一样, ObservableSubscribeOn 并没有subscribe的具体实现所以去找其父类Observable,而Observable的subscribe实现是使用子类的subscribeActual实现
ObservableSubscribeOn.java
@Override
public void subscribeActual(final Observer<? super T> observer) {
// 这里的 observer 就是 ObserveOnObserver
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
// 触发我们自定义的 observer 执行 onSubscribe
observer.onSubscribe(parent);
// scheduler 是 Schedulers.io() → IoScheduler()
// parent 是 SubscribeOnObserver
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
IoScheduler没有实现scheduleDirect方法,所以去找其父类 Scheduler
Scheduler.java
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 这里的createWorker() 又是抽象方法交给子类 IoScheduler实现
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
// 如果深入这里会有点跑题,在这里我们简单理解成IO线程执行了我们的run即可
w.schedule(task, delay, unit);
return task;
}
我们要run的是什么?
SubscribeTask.java
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
// parent 是 SubscribeOnObserver
// source是 ObservableJust
source.subscribe(parent);
}
}
3 ObservableJust.subscribe → subscribeActual
和ObservableObserveOn ObservableSubscribeOn一样, ObservableJust 并没有subscribe的具体实现所以去找其父类Observable,而Observable的subscribe实现是使用子类的subscribeActual实现
ObservableJust.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
// observer 是 SubscribeOnObserver
// value 是 我们传进去的 "one"
ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
observer.onSubscribe(sd);
sd.run();
}
整体代码简化如下
- new ObserveOnObserver<>(observer, w, delayError, bufferSize)
- new SubscribeOnObserver<>(observer)
- ScalarDisposable sd = new ScalarDisposable<>(observer, value)
这里除了有我们提到的装饰器模式之外,还有一个模板模式的使用, 父类把架构搭建完毕, 然后子类去实现某个抽象的功能,比如 Observable的 subscribeActual / Scheduler的 createWorker
回调流
1 ScalarDisposable.run
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
/// observer 是 SubscribeOnObserver
/// value 是 "one"
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
2 SubscribeOnObserver.onNext
SubscribeOnObserver(Observer<? super T> downstream) {
/// downstream 其实就是 ObserveOnObserver
this.downstream = downstream;
this.upstream = new AtomicReference<>();
}
@Override
public void onNext(T t) {
/// ObserveOnObserver 的 onNext
downstream.onNext(t);
}
3 ObserveOnObserver.onNext
@Override
public void onNext(T t) {
if (done) {
return;
}
// 把元素先放入队列中
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
/// 这里开始线程切换的骚操作了
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//worker → AndroidSchedulers.mainThread() → HandlerScheduler → HandlerWorker 把线程从io切换到main
worker.schedule(this);
}
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
/// downstream 就是我们自定义的Observer
final Observer<? super T> a = downstream;
for (;;) {
for (;;) {
boolean d = done;
T v;
// ...
v = q.poll();
// 这里把"one"回调给我们的观察者
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
整体代码简化如下:
- io线程 ScalarDisposable.run
- io线程 SubscribeOnObserver.onNext
- io线程 ObserveOnObserver.onNext
- main线程 自定义Observer.onNext
最后让我们再次总结构建过程的方法的作用
- Observable.just 不关心自身在什么线程, 一门心思生产数据
- subscribeOn 决定在哪个线程生产数据
- observeOn 决定在哪个线程消费数据
- subscribe 唤醒整个订阅和回调流 (如缺少此步则整个构建过程无效)
留给读者的问题
observeOn和subscribeOn可以颠倒顺序吗?
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
改为
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
请问Observer的onNext执行在什么线程呢?
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)