Rxjs技术简介
RxJS是ReactiveX编程理念的JavaScript版本。ReactiveX来自微软,它是一种针对异步数据流的编程。简单来说,它将一切数据,包括HTTP请求,DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能。更简洁地实现异步和事件驱动应用。
Rxjs技术简介
一、背景
RxJS是ReactiveX编程理念的JavaScript版本。ReactiveX来自微软,它是一种针对异步数据流的编程。简单来说,它将一切数据,包括HTTP请求,DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能。更简洁地实现异步和事件驱动应用。
ReactiveX结合了Observer模式、Iterator模式和函数式编程和集合来构建一个管理事件序列的理想方式。
版本迭代
最新版本 7.0.0-beta.12
二、技术特点
-
可组合
而Observables是专门设计用来组合异步数据流的。 -
弹性强(灵活)
支持单标量值,而且支持数据序列甚至是无限的数据流。 -
具体实现没有限制
没有对任何并发和异步库的依赖,可以用线程池、事件循环、非阻塞I/O、actors等任何技术来实现,用任何适应的风格。Observable应该永远被认为是异步的,无论底层实现是阻塞还是非阻塞的。可以随时修改实现而不影响使用。 -
Callbacks的问题
对于单层异步任务,简单易用。而对于需要多层嵌套的场景就难于处理了。
三、主要技术
3.1 基本概念
在RxJS中管理异步事件的基本概念如下:
- 流:具有方向和分支的事件
- Observable:代表了一个调用未来值或事件的集合
- Observer:代表了一个知道如何监听Observable传递过来的值的回调集合
- Subscription:代表了一个可执行的Observable,主要是用于取消执行
- Subject:相当于一个EventEmitter,它的唯一的方法是广播一个值或事件给多个Observer
- Operators:是一个纯函数,允许处理集合与函数式编程风格的操作,比如map、filter、concat、flatMap等
- Schedulers:是一个集中式调度程序来控制并发性,允许我们在setTimeout或者requestAnimationFrame上进行协调计算
3.2 例子
3.2.1 定时器
需求:从0开始按顺序每秒输出一个整数
常规解决方法:
function CountCase1() {
let count = 0;
let timer = setInterval(() => {
count += 1;
console.log(`Count case1: ${count}`);
}, 1000);
}
CountCase1();
Rxjs解决方案:
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';
function CountCase2() {
return interval(1000).pipe(take(1000));
}
CountCase2().subscribe((num) => console.log('Count case2: ', num));
3.2.2 创建一个Observable
import { Observable } from 'rxjs';
function Create() {
return Observable.create((observer: any) => {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
}
Create().subscribe({
next: (x: any) => console.log(`got value${x}`),
error: (err: any) => console.error(`somthing wrong occurred: ${err}`),
complete: () => console.log('done')
});
console.log('just after subscribe');
结论:
- 创建的Observable对象是异步的
- 调用方式类似于函数调用
- Observable对象能够返回多个值
3.2.3 取消一个Observable
Subscription代表了一个一次性的资源,通常表示的是一个Observable execution。一个Subscription有一个重要的方法,unsubscribe,它不需要参数,仅仅是处理subscription的资源。在之前的RxJS版本中,Subscription被称作"Disposable"。
一个Subscription实质上是一个unsubscribe()函数,用来释放资源或者取消一个Observable executions。
import { Observable } from 'rxjs';
function Create() {
return Observable.create((observer: any) => {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
}
const subscription = Create().subscribe({
next: (x: any) => console.log(`got value${x}`),
error: (err: any) => console.error(`somthing wrong occurred: ${err}`),
complete: () => console.log('done')
});
subscription.unsubscribe();
console.log('just after subscribe');
3.2.4 多路广播
一个RxJS Subject是一个特殊类型的Observable,它允许值可以多路广播给多个Observers。普通的Observables是单路广播(每个subscribed Observer拥有自己独立的Observable execution),Subjects是多路广播。
import { Subject } from 'rxjs';
function Create() {
return new Subject();
}
const subject = Create();
subject.subscribe({
next: (x: any) => console.log(`got value in case 1: ${x}`),
error: (err: any) => console.error(`somthing wrong occurred in case 1: ${err}`),
complete: () => console.log('in case 1 done')
})
subject.subscribe({
next: (x: any) => console.log(`got value in case 2: ${x}`),
error: (err: any) => console.error(`somthing wrong occurred in case 2: ${err}`),
complete: () => console.log('in case 2 done')
})
subject.next(1);
subject.next(2);
3.2.5 Operators
Opeartors是Obsrevable的方法,就像map()、filter()、merge()等。当它被调用时,它们并不改变已经存在的Observable,而是返回一个基于第一个Observable上新的Observable。
import { of, forkJoin, Observable, throwError, race } from 'rxjs';
function Create1(): Observable<string> {
return of('Operater Demo 1');
}
function Create2(): Observable<string> {
return of('Operater Demo 2');
}
function Create3(): Observable<string> {
return Observable.create((observer: any) => {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
}
返回一个字符串
const subject = Create1();
subject.subscribe({
next: (x: any) => console.log(`got string: ${x}`),
error: (err: any) => console.error(`somthing wrong occurred: ${err}`),
complete: () => console.log('done')
})
对比Promise
- 并发请求
const subject1 = Create1();
const subject2 = Create2();
const subject3 = Create3();
console.log('start');
forkJoin([subject1, subject2]).subscribe(
(data: Array<string>) => {
data.map(item => console.log(item));
});
console.log('done');
- 竞争请求
console.log('start');
race([subject1, subject2]).subscribe(
(data: string) => {
console.log(data);
});
console.log('done');
- 对于持续流的处理
console.log('start');
forkJoin([subject1, subject3]).subscribe({
next: (data: Array<string>) => {
data.map(item => console.log(item));
}});
console.log('done');
- 对于并发异常对处理
import { Observable, of, throwError } from 'rxjs';
import { catchError, map, mergeAll } from "rxjs/operators";
function Create(): Observable<any> {
return of([
1,
2,
() => throwError('exception.'),
3,
]).pipe(
map(val => val),
mergeAll(),
catchError(() => of('error catched.'))
);
}
var observable = Create()
var finalObserver = {
next: (x: any) => console.log('got value ' + x),
error: (err: any) => console.error('something wrong occurred: ' + err),
complete: () => console.log('done')
};
console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');
3.2.6 Scheduler
当一个subscription开始工作或者notifications被传递,scheduler就会开始调用。
import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from "rxjs/operators";
function Create(): Observable<any> {
return Observable.create((proxyObserver: any) => {
proxyObserver.next(1);
proxyObserver.next(2);
proxyObserver.next(3);
proxyObserver.complete();
}).pipe(
observeOn(asyncScheduler)
);
}
var observable = Create()
var finalObserver = {
next: (x: any) => console.log('got value ' + x),
error: (err: any) => console.error('something wrong occurred: ' + err),
complete: () => console.log('done')
};
console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');
参考资料
[1] ReactiveX简介
[2] rxjs
[3] RxJS进阶——关于流的理解和应用
[4] ReactiveX官网
[5] Rxjs源码
[6] Rxjs中文文档
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)