//  本次源码分析基于rxjava 3.0 
//  使用最新的rxjava版本看这里 https://github.com/ReactiveX/RxAndroid
dependencies {
    implementation 'io.reactivex.rxjava3:rxandroid:3.0.2'
    implementation 'io.reactivex.rxjava3:rxjava:3.1.5'
}

先看看我们平时是如何使用的?

 Observable.just("one").subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(object : Observer<String> {
                override fun onSubscribe(d: Disposable) {

                }

                override fun onNext(t: String) {
                      println("hhhhh")
                }

                override fun onError(e: Throwable) {

                }

                override fun onComplete() {

                }
            })

整体流程比较复杂,我们接下来通过多个UML图和代码一起来理解一下

类图

在这里插入图片描述
点击这里可以在线修改类图PlantUML

顺序图

在这里插入图片描述
点击这里可以在线修改顺序图PlantUML

  • 构建流 从1到6 是我们开发者编写rxjava代码的过程
  • 订阅流 从7到13 是rxjava内部注册订阅者的过程
  • 回调流 从14到16 是rxjava内部把产生的数据回调给订阅者的过程

接下来让我们通过代码加深理解, 因为代码是比较枯燥的,建议在代码分析过程中我们时而回顾一下类图和顺序图可以帮我们更好地理解整体思路

源码分析

构建流

1 Observable.just(“one”) → ObservableJust.java

 public static <@NonNull T> Observable<T> just(@NonNull T item) {
        Objects.requireNonNull(item, "item is null");
        // 这里返回的就是ObservableJust
        return RxJavaPlugins.onAssembly(new ObservableJust<>(item));
    }

2 ObservableJust.subscribeOn → ObservableSubscribeOn.java

ObservableJust.java中并没有subscribeOn方法,具体方法在其父类Observable中
ObservableJust.java
public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> {

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }

    @Override
    public T get() {
        return value;
    }
}
 Observable.java
 public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        // 这里的this就是 ObservableJust, scheduler 是 Schedulers.io()
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
    }

3 ObservableSubscribeOn.observeOn → ObservableObserveOn.java

ObservableSubscribeOn中并没有observeOn方法,具体方法在其父类Observable中

Observable.java

  public final Observable<T> observeOn(@NonNull Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }


  public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
        /// this 是 ObservableSubscribeOn, scheduler 是AndroidSchedulers.mainThread()
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
    }

4 ObservableObserveOn.subscribe → void

ObservableObserveOn中并没有subscribe方法,具体方法在其父类Observable中

Observable.java
  @Override
    public final void subscribe(@NonNull Observer<? super T> observer) {
        // ....
        try {
              // ....
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
           // ....
        }
    }

整体代码可以简化如下

  • new ObservableJust<>(item) 👇🏻👇🏻👇🏻
  • new ObservableSubscribeOn<>(this, scheduler) 👇🏻👇🏻👇🏻
  • new ObservableObserveOn<>(this, scheduler, delayError, bufferSize))

从上到下像套娃一样不断的把对象套进另一个对象里, 这里其实就是使用了装饰器模式 的设计模式

订阅流

1 ObservableObserveOn.subscribe → subscribeActual

ObservableObserveOn 并没有subscribe的具体实现所以去找其父类Observable,而Observable的subscribe实现是使用子类的subscribeActual实现

Observable.java
  @Override
    public final void subscribe(@NonNull Observer<? super T> observer) {
          // ...
          subscribeActual(observer);
          // ...
    }
ObservableObserveOn.java
@Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            // source 就是 ObservableSubscribeOn
            // w 是  AndroidSchedulers.mainThread()
            // observer 是我们自定义的 println("hhhhh") 的 Observer
            source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
        }
    }

2 ObservableSubscribeOn.subscribe → subscribeActual

和ObservableObserveOn一样, ObservableSubscribeOn 并没有subscribe的具体实现所以去找其父类Observable,而Observable的subscribe实现是使用子类的subscribeActual实现

ObservableSubscribeOn.java
@Override
    public void subscribeActual(final Observer<? super T> observer) {
        // 这里的 observer 就是 ObserveOnObserver
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
        // 触发我们自定义的 observer 执行 onSubscribe
        observer.onSubscribe(parent);

        // scheduler 是 Schedulers.io()  → IoScheduler()
        //  parent 是 SubscribeOnObserver
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

IoScheduler没有实现scheduleDirect方法,所以去找其父类 Scheduler

Scheduler.java
  @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        // 这里的createWorker() 又是抽象方法交给子类 IoScheduler实现
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);
        // 如果深入这里会有点跑题,在这里我们简单理解成IO线程执行了我们的run即可
        w.schedule(task, delay, unit);

        return task;
    }

我们要run的是什么?

SubscribeTask.java
final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
             // parent 是 SubscribeOnObserver
            // source是  ObservableJust
            source.subscribe(parent);
        }
    }

3 ObservableJust.subscribe → subscribeActual

和ObservableObserveOn ObservableSubscribeOn一样, ObservableJust 并没有subscribe的具体实现所以去找其父类Observable,而Observable的subscribe实现是使用子类的subscribeActual实现

ObservableJust.java
 @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // observer 是 SubscribeOnObserver
        // value 是 我们传进去的 "one"
        ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }

整体代码简化如下

  • new ObserveOnObserver<>(observer, w, delayError, bufferSize)
  • new SubscribeOnObserver<>(observer)
  • ScalarDisposable sd = new ScalarDisposable<>(observer, value)

这里除了有我们提到的装饰器模式之外,还有一个模板模式的使用, 父类把架构搭建完毕, 然后子类去实现某个抽象的功能,比如 Observable的 subscribeActual / Scheduler的 createWorker

回调流

1 ScalarDisposable.run

@Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
                ///  observer 是  SubscribeOnObserver
                ///   value   是  "one"
                observer.onNext(value);
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }

2 SubscribeOnObserver.onNext

 SubscribeOnObserver(Observer<? super T> downstream) {
            /// downstream 其实就是 ObserveOnObserver
            this.downstream = downstream;
            this.upstream = new AtomicReference<>();
        }
        
 @Override
 public void onNext(T t) {
      /// ObserveOnObserver 的 onNext
      downstream.onNext(t);
 }

3 ObserveOnObserver.onNext

@Override
 public void onNext(T t) {
      if (done) {
         return;
       }

     // 把元素先放入队列中
     if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
     }
     /// 这里开始线程切换的骚操作了
     schedule();
}

void schedule() {
   if (getAndIncrement() == 0) {
       //worker → AndroidSchedulers.mainThread() →  HandlerScheduler → HandlerWorker 把线程从io切换到main
       worker.schedule(this);
   }
}

@Override
 public void run() {
     if (outputFused) {
        drainFused();
     } else {
        drainNormal();
     }
}
void drainNormal() {
   int missed = 1;
   final SimpleQueue<T> q = queue;
   /// downstream 就是我们自定义的Observer
   final Observer<? super T> a = downstream;
   for (;;) {
      for (;;) {
                 boolean d = done;
                    T v;
                    // ...
                    v = q.poll();
                   // 这里把"one"回调给我们的观察者
                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

整体代码简化如下:

  • io线程 ScalarDisposable.run
  • io线程 SubscribeOnObserver.onNext
  • io线程 ObserveOnObserver.onNext
  • main线程 自定义Observer.onNext
    在这里插入图片描述

最后让我们再次总结构建过程的方法的作用

  • Observable.just 不关心自身在什么线程, 一门心思生产数据
  • subscribeOn 决定在哪个线程生产数据
  • observeOn 决定在哪个线程消费数据
  • subscribe 唤醒整个订阅和回调流 (如缺少此步则整个构建过程无效)

留给读者的问题

observeOn和subscribeOn可以颠倒顺序吗?

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

改为  

.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())

请问Observer的onNext执行在什么线程呢?
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐