响应式编程详解
响应式编程 (reactive programming) 是一种基于数据流 (data stream) 和 变化传递 ,以“非阻塞”和“异步”为特性,采用函数式的语法,实现并发执行效率。统一了java并发编程模型,使同步与异步的实现代码无明显差异。
响应式编程
1.多维度看全景
1.1响应式编程(Reactive Programming )
为了直观地了解什么是响应式,我们先从一个大家都比较熟悉的类比开始。首先打开Excel,在B、C、D三列输入如下公式:
B、C和D三列每个单元格的值均依赖其左侧的单元格,当我们在A列依次输入1、2和3时,变化会自动传递到了B、C和D三列,并触发相应状态变更,如下图:
我们可以把A列从上到下想象成一个数据流,每一个数据到达时都会触发一个事件,该事件会被传播到右侧单元格,后者则会处理事件并改变自身的状态。这一系列流程其实就是反应式的核心思想。
通过这个例子,你应该能感受到响应式的核心是数据流(data stream)。下面我们再来看一个例子。我们很多人每天都会坐地铁上下班,地铁每两分钟一班,并且同一条轨道会被很多地铁共享,你会不会因为担心追尾而不敢坐首尾两节车厢呢? 其实如果采用反应式架构构建地铁系统,就无需担心追尾问题。在反应式系统中,每辆地铁都会实时将自己的速度和位置等状态信息通知给上下游的其他地铁,同时也会实时的接收其他地铁的状态信息,并实时做出反馈。例如当发现下游地铁突然意外减速,则立即调整自身速度,并将减速事件通知到上游地铁,如此,整条轨道上的所有地铁形成一种背压机制(back pressure),根据上下游状态自动调整自身速度。
下面我们来看下维基百科关于反应式编程的定义:
响应式编程 (reactive programming) 是一种基于数据流 (data stream) 和 变化传递 (propagation of change) 的声明式 (declarative) 的编程范式。
从上面的定义中,我们可以看出反应式编程的核心是数据流以及变化传递。这一定义没有区分数据流的同步和异步模式, 更准确地说,异步数据流(asynchronous data stream)才是反应式编程的最佳实践。因为传统的编程方式是顺序(同步)执行的。当任务之间存在依赖关系时,上一个任务完成之后才会执行下一个任务。如果需要提高程序的响应速度,需要把同步执行的程序改为异步执行,这样方法的执行就变为了消息发送,能够挖掘多核CPU的能力、降低延迟和阻塞。细心的读者会发现,这不就是观察者模式(Observer Pattern)嘛! 其实反应式并不是指具体的技术,而是指一些架构设计原则, 观察者模式是实现反应式的一种手段。
经过上述的学习,响应式编程的特点可以归纳如下:
-
数据流:基于数据流模型,响应式编程提供了一套统一的Stream风格的数据处理接口。与java 8中的Stream相比,响应式编程除了支持静态数据流,还支持动态数据流,并且允许复用和同时接入多个订阅者。
-
变化传播:就是以一个数据流作为输入,经过一连串操作转化为另一个数据流,然后分发给各个订阅者的过程。
-
异步编程:传统的编程方式是顺序(同步)执行的。当任务之间存在依赖关系时,上一个任务完成之后才会执行下一个任务。如果需要提高程序的响应速度,需要把同步执行的程序改为异步执行,这样方法的执行就变为了消息发送。响应式编程提供了合适的异步编程模型能够挖掘多核CPU的能力、降低延迟和阻塞。
1.2函数式编程(Functional Programming, 简称FP)
java语言中,数据类型分为基本类型和引用类型,这些类型可以用于赋值和传递,通常被称为一等公民(这个术语是从20世纪60年代美国民权运动中借用来的)。编程语言中的其他结构也许有助于我们表示值的结构,但在程序执行期间不能传递,因而是二等公民。在Java8之前,方法和类就是二等公民。
人们发现,在运行时传递方法能将方法变成一等公民,这在编程中非常有用,因此Java 8的设计者把这个功能加入到了Java中。Java 8允许“把代码传递给方法”,这种方式简洁地表达了行为参数化。虽然Java 8之前可以用匿名类实现行为参数化,但Java 8代码更加简洁、可读性更好、还让我们能够使用一整套新技巧(流操作)。这种函数可以被来回传递并加以组合,从而产生强大的编程语汇,我们将这种编程范式称为函数式编程。
下面仍以java8中的函数为例,总结函数式编程的特点如下:
-
java8中函数是“一等公民”(First Class)。函数与其他数据类型一样,处于平等地位,可以赋值给其他变量、作为入参或返回值。
-
java8支持闭包和高阶函数。(闭包是起函数的作用并可以像对象一样操作的对象。高阶函数可以使用另一个函数作为输入,同时支持返回一个函数作为输出。这两种结构结合在一起可以以优雅的方式进行模块化编程。)
-
递归。在函数式编程中,变量都是不可变的(immutable),所以函数式编程无法实现循环操作。循环操作需要依赖一个可变的状态做为跳出循环的条件。因此函数式编程把递归作为控制流程的机制。
-
惰性求值(Lazy Evaluation)。支持惰性求值的编译器会像数学家看待代数表达式那样看待函数式编程的程序,即抵消相同项从而避免执行无谓的代码。
-
没有“副作用”(Side Effect)。函数要保持独立,所有功能就是返回一个新的值,没有其他行为,不会修改外部变量的值。没有并发编程中的线程安全问题。
1.3技术演进
Future模式: JDK5中的Future模式是多线程的一种设计模式。我们可以提交任务给一个Future,Future替我们完成任务,期间我们可以去做其他想做的事情,一段时间后从Future中获取结果。虽然Future模式支持获取异步执行的任务结果,但它并没有提供通知的机制,所以我们无法得知Future什么时候完成任务。要么使用isDone()不停轮询判断Future是否完成,这样会耗费CPU资源;要么执行future.get()等待Future返回的结果,这会使程序变为同步操作。
CompletableFuture弥补了Future模式的缺点,在异步任务完成后,可以将其结果通过 thenAccept、thenApply、thenCompose等操作交给另一个异步事件处理线程来处理。(即事件通知,回调)
总结:响应式编程 (reactive programming) 是一种基于数据流 (data stream) 和 变化传递 ,以“非阻塞”和“异步”为特性,采用函数式的语法,实现并发执行效率。统一了java并发编程模型,使同步与异步的实现代码无明显差异。
1.4Rx是什么
ReactiveX是Reactive Extensions的缩写,一般简写为Rx,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源的编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流。Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了。Rx的大部分语言库由ReactiveX这个组织负责维护,社区网站是 reactivex.io。
微软给的定义是,Rx是一个函数库。使用Rx开发者可以用Observables表示异步数据流,用LINQ操作符查询、组合、变换异步数据流, 用Schedulers参数化异步数据流的并发处理。Rx可以这样定义:Rx = Observables + LINQ + Schedulers。ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。
向对象编程和面向切面编程(Aspect Orient Programming)在java领域是最常见的两种编程思想。Rx又不仅仅是一个编程函数库,它是一种编程思想的突破。它影响了许多其它的程序库和框架以及编程语言,很多公司都在使用Rx。
1.5响应式宣言
2013年6月,Roland Kuhn等人发布了《响应式宣言》, 该宣言定义了反应式系统应该具备的一些架构设计原则。符合反应式设计原则的系统称为反应式系统。根据反应式宣言, 反应式系统需要具备即时响应性(Responsive)、回弹性(Resilient)、弹性(Elastic)和消息驱动(Message Driven)四个特质,以下内容摘自反应式宣言官网:
-
即时响应性(Responsive)。系统应该对用户的请求即时做出响应。即时响应是可用性和实用性的基石, 而更加重要的是,即时响应意味着可以快速地检测到问题并且有效地对其进行处理。
-
回弹性(Resilient)。 系统在出现失败时依然能保持即时响应性, 每个组件的恢复都被委托给了另一个外部的组件, 此外,在必要时可以通过复制来保证高可用性。 因此组件的客户端不再承担组件失败的处理。
-
弹性(Elastic)。 系统在不断变化的工作负载之下依然保持即时响应性。 反应式系统可以对输入负载的速率变化做出反应,比如通过横向地伸缩底层计算资源。 这意味着设计上不能有中央瓶颈, 使得各个组件可以进行分片或者复制, 并在它们之间进行负载均衡。
-
消息驱动(Message Driven)。反应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。 这一边界还提供了将失败作为消息委托出去的手段。 使用显式的消息传递,可以通过在系统中塑造并监视消息流队列, 并在必要时应用回压, 从而实现负载管理、 弹性以及流量控制。 使用位置透明的消息传递作为通信的手段, 使得跨集群或者在单个主机中使用相同的结构成分和语义来管理失败成为了可能。 非阻塞的通信使得接收者可以只在活动时才消耗资源, 从而减少系统开销。
2.钻进去看本质
2.1名称解释(rajava)
2.2观察者模式
设计模式分为三类:创建型模式是将创建和使用代码解耦;结构型模式是将不同功能的代码解耦;行为型模式是将不同的行为代码解耦。观察者模式是行为型模式。
在Rx中,很多指令可能是并行执行的,之后他们的执行结果才会被“观察者”捕获。为达到这个目的,在RxJava中,一个实现了Observer接口的对象可以订阅(subscribe)一个Observable 类的实例。订阅者(subscriber)对Observable发射(emit)的任何数据或数据序列作出响应。这种模式简化了并发操作,因为它不需要阻塞等待Observable发射数据,而是创建了一个处于待命状态的观察者哨兵,哨兵在未来某个时刻响应Observable的通知。
Subscribe方法用于将观察者连接到Observable,观察者需要实现以下方法的一个子集:
- onNext(T item):
Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。 - onError(Exception ex)
当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。 - onComplete
正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。
这种方法的优点是,如果你有大量的任务要处理,它们互相之间没有依赖关系。你可以同时开始执行它们,不用等待一个完成再开始下一个(用这种方式,你的整个任务队列能耗费的最长时间,不会超过任务里最耗时的那个)。
在实际的项目中,被观察者(Observable)和观察者(Observer)这两个对象的称呼比较灵活,叫法多种多样,比如:Subject-Observer、Publisher-Subscriber、Producer-Consumer、EventEmitter-EventListener、Dispatcher-Listener。这个模型通常被称作Reactor模式。Reactor 反应堆设计模式
根据应用场景的不同,观察者模式会对应不同的代码实现方式:有同步阻塞的实现方式,也有异步非阻塞的实现方式;有进程内的实现方式,也有跨进程的实现方式。
- 同步阻塞方式
// 观察者
public interface RegObserver {
void handleRegSuccess(long userId);
}
// 促销观察者
public class RegPromotionObserver implements RegObserver {
private PromotionService promotionService; // 依赖注入
@Override
public void handleRegSuccess(long userId) {
promotionService.issueNewUserExperienceCash(userId);
}
}
// 提醒观察者
public class RegNotificationObserver implements RegObserver {
private NotificationService notificationService;
@Override
public void handleRegSuccess(long userId) {
notificationService.sendInboxMessage(userId, "Welcome...");
}
}
// 第一种实现
public class UserController {
private UserService userService; // 依赖注入
private List<RegObserver> regObservers = new ArrayList<>();
// 把观察者注册,一次性设置好(支持动态的修改)
public void setRegObservers(List<RegObserver> observers) {
regObservers.addAll(observers);
}
public Long register(String telephone, String password) {
//...省略非核心代码
long userId = userService.register(telephone, password);
// 遍历观察者,消息回调(执行过程会阻塞当前线程)
for (RegObserver observer : regObservers) {
observer.handleRegSuccess(userId);
}
return userId;
}
}
- 异步非阻塞方式
// 第二种实现方式,其他类代码不变
public class UserController {
private UserService userService; // 依赖注入
private List<RegObserver> regObservers = new ArrayList<>();
private Executor executor;
public UserController(Executor executor) {
this.executor = executor;
}
public void setRegObservers(List<RegObserver> observers) {
regObservers.addAll(observers);
}
public Long register(String telephone, String password) {
//...省略非核心代码
long userId = userService.register(telephone, password);
// 异步执行观察者逻辑
for (RegObserver observer : regObservers) {
executor.execute(new Runnable() {
@Override
public void run() {
observer.handleRegSuccess(userId);
}
});
}
return userId;
}
}
上述两种实现方式的缺点是,所有逻辑都耦合在register函数中,该观察者模式无法被其他业务复用。而RxJava中提供的观察者模式隐藏实现细节,解耦业务与非业务代码,让程序员聚焦业务开发,可复用性更好。
Observable分类:
根据Observable开始发射数据序列的时机分类。
分类 | 翻译 | 含义 | 弹珠图 |
---|---|---|---|
Cold Observable | “冷”被观察者 | 只有观察者订阅了,才开始执行发送数据流的代码。==Cold Observable和Observer只能是一对一的关系。==当有多个不同的订阅者时,消息是完整发送的,它们各个事件是独立的。意味着,被观察者同一个发射事件要执行多次,发送给不同的订阅者。而Cold Observable是一张音乐CD,人们可以独立购买并听取它。 | - |
Hot Observable | “热”被观察者 | 一个"热"的Observable可能一创建完就开始发射数据,因此所有后续订阅它的观察者可能从序列中间的某个位置开始接受数据(有一些数据错过了)。Hot Observable好比是一个广播电台,所有在此刻收听的观众都会听到同一首歌。 | Publish将普通的Observable转换为可连接的Observable |
Connectable Observable | - | 不管有没有观察者订阅它,这种Observable都不会开始发射数据,除非Connect方法被调用。 | - |
观察者模式 vs 消息队列
响应式编程模式可以实现功能的解耦,但如果不同的系统之间需要解耦,就需要借助消息队列(Message Queue)来实现。弊端是需要引入一个新的系统(消息队列),增加了维护成本。优势是被观察者和观察者解耦更加彻底,两部分的耦合更小,被观察者完全不感知观察者,同理,观察者也完全不感知被观察者。被观察者只管发送消息到消息队列,观察者只管从消息队列中读取消息来执行相应的逻辑。
而在观察者模式中中,观察者需要注册到被观察者中,被观察者需要依次遍历观察者来发送消息。
2.3HelloWorld
同步版本
void helloWorld1() {
// 在同一个线程中同步执行
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello World");
emitter.onNext("Hello World2");
emitter.onComplete();
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println(s);
}
});
}
异步版本
默认情况下,RxJava只在当前线程中运行,它是单线程的。Observable用于发射数据流,Observer用于接收和响应数据流,各种操作符(Operators)用于加工数据流,他们都是在同一个线程中运行,这是一种同步的函数响应式编程。
可以给Observable操作符添加多线程功能,可指定操作符在特定的调度器上执行。某些操作符有变体形式,可以接受一个Scheduler参数,操作符的任务将在指定的调度器上执行。
observeOn接收一个Scheduler参数,指定下游操作运行在特定的线程调度器Scheduler上(即在特定的调度器上调用观察者的onNext、onError、onComplete方法)。
subscribeOn接收一个Scheduler参数,指定上游操作(发射数据相关)在特定的线程调度器Scheduler上。
void helloWorld2() {
Observable.just("Hello World1", "Hello World2")
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
}
背压(回压)版本
RxJava中只有Flowable支持背压。
RxJava是基于Push模型的。如果生产者比较慢,则消费者会等待数据的到来;如果生产者比较快,会导致很多数据发射给消费者,而不管消费者当前是否有能力处理数据,这样就会导致背压。
RxJava 2.x中的背压策略有:MISSING、ERROR(抛异常)、BUFFER(可以无限缓存,可能OOM)、DROP(丢弃)、LATEST(与DROP策略一样,不同的是,不管缓存池的状态如何,LATEST策略会将最后一条数据强行放入缓存池中。)。
void helloWorld_backPressure() {
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> flowableEmitter) throws Exception {
for (int i = 0; i < 128; i++) { //
flowableEmitter.onNext("hello world " + i);
}
}
// 如果放入Flowable的异步缓存池中的数据超限了,则会抛出 MissingBackpressureException 异常。Flowable的默认队列是128,所以将案例中的129改成128程序就可以正常运行了。
}, BackpressureStrategy.ERROR)
.doOnNext(new Consumer<String>() {
@Override
public void accept(String content) throws Exception {
System.out.println("发射数据的线程:" + Thread.currentThread().getName() + ", 内容为:" + content);
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(String content) throws Exception {
System.out.println("接收数据的线程:" + Thread.currentThread().getName() + ", 内容为:" + content);
}
});
try {
Thread.sleep(2000L);
} catch (Exception e) {
}
}
并行版本
上述案例中,被观察者(Observable/Flowable/Single/Completable/Maybe)发射的数据流可以经历各种线程切换,但是数据流的各个元素之间不会产生并行执行的效果。并行不是并发,更不是异步。
先看一段java8并行流执行的效果。
void java8Parallel() {
List<Integer> result = new ArrayList<>();
for (Integer i = 1; i <= 100; i++) {
result.add(i);
}
result.parallelStream()
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) {
return integer.toString();
}
})
.forEach(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println("s=" + s + "; Current Thread Name=" + Thread.currentThread().getName());
}
});
}
从执行结果看,java8底层借助JDK 的fork/join框架实现并行编程。
RxJava利用flatMap实现并行,代码如下:
void flatMapParallel() {
Observable.range(1,100)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.just(integer)
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
System.out.println("map 操作所在线程:" + Thread.currentThread().getName() + ", data:" + integer);
return integer.toString();
}
});
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("接收数据线程:" + Thread.currentThread().getName() + ", data:" + s);
}
});
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Round-Robin算法
通过算法把数据按线程分组,每组个数相同,一起发送处理,这样可以减少Observable的创建,从而节省系统资源。
void roundRobinParallel() {
final AtomicInteger batch = new AtomicInteger(0);
Observable.range(1, 100)
.groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return batch.getAndIncrement() % 5;
}
})
.flatMap(new Function<GroupedObservable<Integer, Integer>, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
return integerIntegerGroupedObservable.observeOn(Schedulers.io())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
System.out.println("map 操作所在线程:" + Thread.currentThread().getName() + ", data:" + integer);
return integer.toString();
}
});
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
}
ParallelFlowable实现并行
在相应的操作符上调用Flowable的parallel() 方法就会返回 ParallelFlowable。 默认并行级别被设置为可用CPU的数量(Runtime.getRuntime().availableProcessors())
void parallelFlowable() {
// parallel()方法只准备并行流
ParallelFlowable<Integer> parallel = Flowable.range(1, 100).parallel();
// 通过runOn(Scheduler)操作符来定义异步
parallel.runOn(Schedulers.io())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
System.out.println("map 操作所在线程:" + Thread.currentThread().getName() + ", data:" + integer);
return integer.toString();
}
})
.sequential() // 返回到顺序流
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("接收数据线程:" + Thread.currentThread().getName() + ", data:" + s);
}
});
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Observable.flatMap 和 ParallelFlowable比较:
-
借助flatMap操作符进行分离和加入一个序列较为复杂,并引起一定的开销。ParallelFlowable开销会更小一些
-
ParallelFlowable的操作符很有限,如果有一些特殊的操作需要并行而ParallelFlowable又不支持时,就需要使用基于flatMap来实现并行。
2.4 Observable操作符
- 创建Observable(创建操作)
- 组合多个Observable(组合操作)
- 对Observable发射的数据执行变化操作(变换操作)
- 对Observable发射的数据中取特定的值(过滤操作)
- 转发Observable的部分值(条件、布尔、过滤操作)
- 对Observable发射的数据序列求值(算术、聚合操作)
2.5 EventBus vs RxJava
EventBus是事件总线框架,是一个消息发布——订阅的模式,工作机制类似于观察者模式,通过通知者去注册观察者,最后由通知者向观察者发布消息。它可以利用很少的代码,来实现多组件间通信。
EventBus早起出自于Google Guava。
RxJava给我们带来了响应式编程,使用RxJava可以实现EventBus的全部功能。如果项目中已经使用RxJava,可以无须使用EventBus。
eventBus demo 略。
3.参考资料
弹珠图
RxJava创建操作符
《RxJava2.x 实战》一书中的相关的例子
RxJava在github上的开源地址
Rxjava关于Disposable你应该知道的事
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)