angular官方文档RxJS 库
RxJS库官方文档

RxJS库

RxJS是ReactiveX编程理念的JavaScript版本。ReactiveX来自微软,它是一种针对异步数据流的编程。简单来说,它将一切数据,包括HTTP请求,DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能。

响应式编程是一种面向数据流和变更传播的异步编程范式(Wikipedia)。RxJS(响应式扩展的 JavaScript 版)是一个使用可观察对象进行响应式编程的库,它让组合异步代码和基于回调的代码变得更简单。

RxJS 提供了一种对 Observable 类型的实现,直到 Observable 成为了 JavaScript 语言的一部分并且浏览器支持它之前,它都是必要的。这个库还提供了一些工具函数,用于创建和使用可观察对象。这些工具函数可用于:

  • 把现有的异步代码转换成可观察对象
  • 迭代流中的各个值
  • 把这些值映射成其它类型
  • 对流进行过滤
  • 组合多个流

2.安装

npm install rxjs

3. 方法

方法作用
of(…args)可以将普通JavaScript数据转为可观察序列
fromPromise(promise)将Promise转化为Observable
fromEvent(elment, eventName)从DOM事件创建序列,例如Observable.fromEvent($input, ‘click’)
ajax(urlAjaxRequest)
create(subscribe)这个属于万能的创建方法,一般用于只提供了回调函数的某些功能或者库,在你用这个方法之前先想想能不能用RxJS上的类方法来创建你所需要的序列
import {of,fromPromise,fromEvent,Observable} from 'rxjs'
import { ajax } from 'rxjs/ajax';
const nums = of(1, 2, 3); //将普通JavaScript数据转为可观察序列

//实际上Observable可以认为是加强版的Promise,它们之间是可以通过RxJS的API互相转换的
const ob = Observable.fromPromise(somePromise); // Promise转为Observable
const promise = someObservable.toPromise();        // Observable转为Promise

//从DOM事件创建序列
const el = document.getElementById('my-element')!;
const mouseMoves = fromEvent<MouseEvent>(el, 'mousemove');

//发送http请求
const apiData = ajax('/api/data');

//
Observable.create((observer) => {})
类别操作
创建from, fromPromise,fromEvent, of,ajax
组合combineLatest, concat, merge, startWith , withLatestFrom, zip
过滤debounceTime, distinctUntilChanged, filter, take, takeUntil
转换bufferTime, concatMap, map, mergeMap, scan, switchMap
工具tap
多播share

4. 创建可观察对象的函数

RxJS 提供了一些用来创建可观察对象的函数。这些函数可以简化根据某些东西创建可观察对象的过程,比如事件、定时器、承诺等等。

import {of } from 'rxjs'


//创建一个Observable观察序列 
//ob作为源会每隔1000ms发射一个递增的数据,即0 -> 1 -> 2
const ob= interval(1000);


//take(3)表示只取源发射的前3个数据,取完第3个后关闭源的发射
//map表示将流中的数据进行映射处理,这里我们将数据翻倍
//filter表示过滤掉出符合条件的数据,这里只有第三个数据会留下来
//前面已经使用同步编程创建好了一个流的处理过程,但此时ob作为源并不会立刻发射数据,如果我们在map中打印n是不会得到任何输出的,因为ob作为Observable序列必须被“订阅”才能够触发上述过程,也就是subscribe(发布/订阅模式)。
ob.take(3).map(n => n * 2).filter(n => n > 0).subscribe(n => console.log(n));

4. subscribe(订阅)

pipe方法用于链接可观察的运算符,而subscribe方法用于激活可观察的并监听发射的值.

** subscribe完整的函数**如下:
下面完整的包含3个函数的对象被称为observer(观察者),表示的是对序列结果的处理方式。 通过Observable的subscribe函数,观察者去订阅可观察者的消息。

  • next表示数据正常流动,没有出现异常;
  • error表示流中出错,可能是运行出错,http报错等等;
  • complete表示流结束,不再发射新的数据

** 特点:**

  • 在一个流的生命周期中,error和complete只会触发其中一个,可以有多个next(表示多次发射数据),直到complete或者error。
  • observer.next可以认为是Promise中then的第一个参数,observer.error对应第二个参数或者Promise的catch。
  • RxJS同样提供了catch操作符,err流入catch后,catch必须返回一个新的Observable。被catch后的错误流将不会进入observer的error函数,除非其返回的新observable出错。
ob.subscribe({
    next: d => console.log(d),
    error: err => console.error(err),
    complete: () => console.log('end of the stream')
})
import { of } from 'rxjs';

// RxJS同样提供了catch操作符,err流入catch后,catch必须返回一个新的Observable。被catch后的错误流将不会进入observer的error函数,除非其返回的新observable出错
of(1).map(n => n*n).catch(err => {
    // 此处处理catch之前发生的错误
    return of(0); // 返回一个新的序列,该序列成为新的流。
});

** subscribe简写方式 **:传入一个函数,会被当做next函数

6.pipe方法

import {of} from 'rxjs'
const ob = of(1,2,3);
const newOb = ob.pipe(
	tap(num=>console.log(num)),
	map(num=>'hello:'+num)
);
newOb.subscribe(data=>console.log(data))

map方法返回一个新的函数,
pipe函数使用闭包返回函数,函数里使用reduce方法把前一个操作的结果作为后一个操作的输入值。
刚开始tap和map里的函数还未执行,直到subscribe方法调用,传入的tap和map里的箭头函数才开始执行。

对于序列里的1,2,3,先执行管道里的tap和map操作,再把map操作的输出,作为输入去执行subscribe里指定的回调。

在这里插入图片描述

7.使用RsJS实现搜索功能

搜索是前端开发中很常见的功能:监听的keyup事件,然后将内容发送到后台,并展示后台返回的数据。

正常实现代码会存在以下2个问题:

  • 多余的请求:
    当想搜索“爱迪生”时,输入框可能会存在三种情况,“爱”、“爱迪”、“爱迪生”。而这三种情况将会发起 3 次请求,存在 2 次多余的请求。

  • 已无用的请求仍然执行:
    一开始搜了“爱迪生”,然后马上改搜索“达尔文”。结果后台返回了“爱迪生”的搜索结果,执行渲染逻辑后结果框展示了“爱迪生”的结果,而不是当前正在搜索的“达尔文”,这是不正确的。

减少多余请求次数:使用防抖;
**已无用的请求仍然执行 **的解决方式,可以在发起请求前声明一个当前搜索的状态变量,后台将搜索的内容及结果一起返回,前端判断返回数据与当前搜索是否一致,一致才走到渲染逻辑。
最终为以下代码:

<input id="text"></input>
<script>
   var text = document.querySelector('#text'),
        timer = null,
        currentSearch = '';

    text.addEventListener('keyup', (e) =>{
        clearTimeout(timer);// 在 250 毫秒内进行其他输入,则清除上一个定时器
        // 定时器,在 250 毫秒后触发
        timer = setTimeout(() => {
            // 声明一个当前所搜的状态变量
            currentSearch = '书'; 

            var searchText = e.target.value;
            $.ajax({
                url: `/search/${searchText}`,
                success: data => {
                    // 判断后台返回的标志与我们存的当前搜索变量是否一致
                    if (data.search === currentSearch) {
                        // 渲染展示
                        render(data);
                    } else {
                        // ..
                    }
                }           
            });
        },250)
    })
</script>

上面代码基本满足需求,但代码开始显得乱糟糟。我们来使用RxJS实现上面代码功能,如下:

import { fromEvent } from 'rxjs';
var text = document.querySelector('#text');
var inputStream = fromEvent(text, 'keyup') //为dom元素绑定'keyup'事件
                    .debounceTime(250) // 防抖动
                    .pluck('target', 'value') // 取值
                    .switchMap(url => Http.get(url)) // 将当前输入流替换为http请求
                    .subscribe(data => render(data)); // 接收数据

RxJS能简化你的代码,它将与流有关的内部状态封装在流中,而不需要在流外定义各种变量来以一种上帝视角控制流程。Rx的编程方式使你的业务逻辑流程清晰,易维护,并显著减少出bug的概率。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐