一、背景

    RxJS是ReactiveX编程理念的JavaScript版本。ReactiveX来自微软,它是一种针对异步数据流的编程。简单来说,它将一切数据,包括HTTP请求,DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能。更简洁地实现异步和事件驱动应用
    ReactiveX结合了Observer模式、Iterator模式和函数式编程和集合来构建一个管理事件序列的理想方式。

图 1.1 ReactiveX的语言(ReactiveX官网)

版本迭代

最新版本 7.0.0-beta.12

二、技术特点

  • 可组合
    而Observables是专门设计用来组合异步数据流的。

  • 弹性强(灵活)
    支持单标量值,而且支持数据序列甚至是无限的数据流。

  • 具体实现没有限制
    没有对任何并发和异步库的依赖,可以用线程池、事件循环、非阻塞I/O、actors等任何技术来实现,用任何适应的风格。Observable应该永远被认为是异步的,无论底层实现是阻塞还是非阻塞的。可以随时修改实现而不影响使用。

  • Callbacks的问题
    对于单层异步任务,简单易用。而对于需要多层嵌套的场景就难于处理了。

三、主要技术

3.1 基本概念

在RxJS中管理异步事件的基本概念如下:

  • :具有方向和分支的事件
  • Observable:代表了一个调用未来值或事件的集合
  • Observer:代表了一个知道如何监听Observable传递过来的值的回调集合
  • Subscription:代表了一个可执行的Observable,主要是用于取消执行
  • Subject:相当于一个EventEmitter,它的唯一的方法是广播一个值或事件给多个Observer
  • Operators:是一个纯函数,允许处理集合与函数式编程风格的操作,比如map、filter、concat、flatMap等
  • Schedulers:是一个集中式调度程序来控制并发性,允许我们在setTimeout或者requestAnimationFrame上进行协调计算

3.2 例子

3.2.1 定时器

需求:从0开始按顺序每秒输出一个整数

常规解决方法:

function CountCase1() {
    let count = 0;
    let timer = setInterval(() => {
        count += 1;
        console.log(`Count case1: ${count}`);
    }, 1000);
}
CountCase1();

Rxjs解决方案:

import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

function CountCase2() {
    return interval(1000).pipe(take(1000));
}
CountCase2().subscribe((num) => console.log('Count case2: ', num));

3.2.2 创建一个Observable

import { Observable } from 'rxjs';

function Create() {
    return Observable.create((observer: any) => {
        observer.next(1);
        observer.next(2);
        observer.next(3);
        setTimeout(() => {
            observer.next(4);
            observer.complete();
        }, 1000);
    });
}

Create().subscribe({
    next: (x: any) => console.log(`got value${x}`),
    error: (err: any) => console.error(`somthing wrong occurred: ${err}`),
    complete: () => console.log('done')
});

console.log('just after subscribe');

结论

  • 创建的Observable对象是异步的
  • 调用方式类似于函数调用
  • Observable对象能够返回多个值

3.2.3 取消一个Observable

   Subscription代表了一个一次性的资源,通常表示的是一个Observable execution。一个Subscription有一个重要的方法,unsubscribe,它不需要参数,仅仅是处理subscription的资源。在之前的RxJS版本中,Subscription被称作"Disposable"。
一个Subscription实质上是一个unsubscribe()函数,用来释放资源或者取消一个Observable executions。

import { Observable } from 'rxjs';

function Create() {
    return Observable.create((observer: any) => {
        observer.next(1);
        observer.next(2);
        observer.next(3);
        setTimeout(() => {
            observer.next(4);
            observer.complete();
        }, 1000);
    });
}

const subscription = Create().subscribe({
    next: (x: any) => console.log(`got value${x}`),
    error: (err: any) => console.error(`somthing wrong occurred: ${err}`),
    complete: () => console.log('done')
});

subscription.unsubscribe();

console.log('just after subscribe');

3.2.4 多路广播

   一个RxJS Subject是一个特殊类型的Observable,它允许值可以多路广播给多个Observers。普通的Observables是单路广播(每个subscribed Observer拥有自己独立的Observable execution),Subjects是多路广播。

import { Subject } from 'rxjs';

function Create() {
    return new Subject();
}

const subject = Create();

subject.subscribe({
    next: (x: any) => console.log(`got value in case 1: ${x}`),
    error: (err: any) => console.error(`somthing wrong occurred in case 1: ${err}`),
    complete: () => console.log('in case 1 done')
})

subject.subscribe({
    next: (x: any) => console.log(`got value in case 2: ${x}`),
    error: (err: any) => console.error(`somthing wrong occurred in case 2: ${err}`),
    complete: () => console.log('in case 2 done')
})

subject.next(1);
subject.next(2);

3.2.5 Operators

   Opeartors是Obsrevable的方法,就像map()、filter()、merge()等。当它被调用时,它们并不改变已经存在的Observable,而是返回一个基于第一个Observable上新的Observable。

import { of, forkJoin, Observable, throwError, race } from 'rxjs';

function Create1(): Observable<string> {
    return of('Operater Demo 1');
}

function Create2(): Observable<string> {
    return of('Operater Demo 2');
}

function Create3(): Observable<string> {
    return Observable.create((observer: any) => {
        observer.next(1);
        observer.next(2);
        observer.next(3);
        setTimeout(() => {
            observer.next(4);
            observer.complete();
        }, 1000);
    });
}

返回一个字符串

const subject = Create1();
subject.subscribe({
    next: (x: any) => console.log(`got string: ${x}`),
    error: (err: any) => console.error(`somthing wrong occurred: ${err}`),
    complete: () => console.log('done')
})

对比Promise

  • 并发请求
const subject1 = Create1();
const subject2 = Create2();
const subject3 = Create3();
console.log('start');
forkJoin([subject1, subject2]).subscribe(
    (data: Array<string>) => {
        data.map(item => console.log(item));
});
console.log('done');
  • 竞争请求
console.log('start');
race([subject1, subject2]).subscribe(
    (data: string) => {
        console.log(data);
});
console.log('done');
  • 对于持续流的处理
console.log('start');
forkJoin([subject1, subject3]).subscribe({
    next: (data: Array<string>) => {
        data.map(item => console.log(item));
}});
console.log('done');
  • 对于并发异常对处理
import { Observable, of, throwError } from 'rxjs';
import { catchError, map, mergeAll } from "rxjs/operators";

function Create(): Observable<any> {
    return of([
        1,
        2,
        () => throwError('exception.'),
        3,
    ]).pipe(
        map(val => val),
        mergeAll(),
        catchError(() => of('error catched.'))
    );
}

var observable = Create()

var finalObserver = {
    next: (x: any) => console.log('got value ' + x),
    error: (err: any) => console.error('something wrong occurred: ' + err),
    complete: () => console.log('done')
};

console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');

3.2.6 Scheduler

当一个subscription开始工作或者notifications被传递,scheduler就会开始调用。

import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from "rxjs/operators";

function Create(): Observable<any> {
    return Observable.create((proxyObserver: any) => {
        proxyObserver.next(1);
        proxyObserver.next(2);
        proxyObserver.next(3);
        proxyObserver.complete();
    }).pipe(
        observeOn(asyncScheduler)
    );
}

var observable = Create()

var finalObserver = {
    next: (x: any) => console.log('got value ' + x),
    error: (err: any) => console.error('something wrong occurred: ' + err),
    complete: () => console.log('done')
};

console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');

参考资料

[1] ReactiveX简介
[2] rxjs
[3] RxJS进阶——关于流的理解和应用
[4] ReactiveX官网
[5] Rxjs源码
[6] Rxjs中文文档

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐