【Flutter 异步编程 - 拾】 | 探索 Stream 的转换原理与拓展
theme: cyanosis本文为稀土掘金技术社区首发签约文章,14天内禁止转载,14天后未获授权禁止转载,侵权必究!张风捷特烈 - 出品在上一篇中,我们介绍了 Stream 内置的流元素转化相关操作。这一篇,也是该专题的最后一篇,将带大家探索一下 Stream 中元素转换的原理,并基于此来拓展更多的操作。一、分析 Stream内置转换方法这小节我们将...
theme: cyanosis
本文为稀土掘金技术社区首发签约文章,14天内禁止转载,14天后未获授权禁止转载,侵权必究!
张风捷特烈 - 出品
在上一篇中,我们介绍了 Stream
内置的流元素转化相关操作。这一篇,也是该专题的最后一篇,将带大家探索一下 Stream
中元素转换的原理,并基于此来拓展更多的操作。
一、分析 Stream 内置转换方法
这小节我们将通过对 map
、take
和 reduce
方法的源码实现进行分析。其中 map
、take
将原流转换成另一个 Stream 流;而 reduce
会对元素迭代运算,返回 一个Future元素。其他的方法都属于这两大类,可以举一反三地进行理解。
1. Stream#map 的源码实现
先来回顾一下 Stream#map
方法的作用,它可以将原流转换为其他类型的流:
如下所示,是 map
方法的实现,其中只有一个入参:名叫 convert 的函数对象,该函数的作用是
将当前流中 T 类型元素,转换为 S 类型元素。
---->[Stream#map]---- Stream<S> map<S>(S convert(T event)) { return new _MapStream<T, S>(this, convert); }
map 方法实现本身是非常简单,它会生成一个 _MapStream
对象作为返回值,这点从如下调试中可以证实。从这里很容易推断出 _MapStream
是 Stream
的实现类,有两个泛型分别表示 输入类型 T
和 输出类型 S
; 在构造函数中需要转入输入流和转换函数 convert
。
也就是说,Stream#map
方法是通过自定义 Stream
实现类来完成的,所以现在关键是 _MapStream
中的逻辑处理。下面是 _MapStream
的的所有代码,它继承自 _ForwardingStream
类。本类中维护成员对象 _transform
,会在构造时赋值为转换函数 ;另外会通过父类构造初始化 Stream
成员对象:
``` typedef T _Transformation(S value);
class _MapStream extends _ForwardingStream { final _Transformation _transform;
MapStream(Stream source, T transform(S event)) : this.transform = transform, super(source);
void handleData(S inputEvent, _EventSink sink) { T outputEvent; try { outputEvent = _transform(inputEvent); //tag1 } catch (e, s) { _addErrorWithReplacement(sink, e, s); return; } sink. add(outputEvent); } } ```
除了构造函数之外,_MapStream
中只有 _handleData
方法用于数据转化。我们在 map
中传入的回调方法,最终会在上面的 tag1
处被回调。这可以通过调试很容易知晓:
现在有两个很关键的问题:_MapStream#_handleData
是如何被触发的,原流又是如何转化为新流的。这些在 _MapStream
源码中并未表现出,所以一定在其基类中实现的。
2. 探索 _ForwardingStream 类
先来看一下这个类的定义,它继承自 Stream
,需要非常注意一点:继承时 Stream
的泛型是 T
,也就是说该流中元素是 T 类型的。在 _ForwardingStream
中可以接受两个泛型,其中第一个是 S
,第二个是 T
。 前面说过,该类中会持有 Stream<S>
类型对象,也就是输入流。所以 T
类型代表的是输出类型,这就是 _ForwardingStream
代表的是 输出流
的原因。
其中 _handleData
是唯一的抽象方法,也是之后触发转换操作的地方。
dart abstract class _ForwardingStream<S, T> extends Stream<T> { // 略... void _handleData(S data, _EventSink<T> sink); }
对于 Stream
的实现类而言,最重要的是实现 listen
抽象方法,创建 StreamSubscription
对象。如下所示,创建的 _ForwardingStreamSubscription
对象。在 《深入剖析 Stream 机制源码实现》 中我们已经知道,listen 方法的第一入参最终归宿是 StreamSubscription
的实现类。如下可以看出,当前流 的listen
方法返回的是 _ForwardingStreamSubscription
对象:
``` StreamSubscription listen(void onData(T value)?, {Function? onError, void onDone()?, bool? cancelOnError}) { return _createSubscription(onData, onError, onDone, cancelOnError ?? false); }
StreamSubscription
_createSubscription(void onData(T data)?, Function? onError, void onDone()?, bool cancelOnError) { return new _ForwardingStreamSubscription
( this, onData, onError, onDone, cancelOnError); } ```
【敲黑板】 在 _ForwardingStreamSubscription
构造方法中,做了两件事:
-
- 通过入参为输出流
_ForwardingStream
类型成员_stream
赋值。
- 通过入参为输出流
-
- 对输入流进行监听,每次原流中元素激活时,会触发
本类#_handleData
方法。
- 对输入流进行监听,每次原流中元素激活时,会触发
在 _ForwardingStreamSubscription#_handleData
中,会触发输出流的 _handleData
方法,也就是在 _MapStream
中的那个具体实现。其中的 data
是原流中的激活元素;这里第二参非常值得注意,可以自己先好好品味一下这个 this
是什么,在接下来会进行说明。
---->[_ForwardingStreamSubscription#_handleData]---- void _handleData(S data) { _stream._handleData(data, this); }
可以在下面调试中看到方法栈情况:在新流的订阅者触发 _handleData
时,触发 _MapStream#_handleData
方法。现在仔细看下面 _handleData
的第二参,是一个 _EventSink<T>
对象,它就是上面的那个 this
对象,即 _ForwardingStreamSubscription
类型对象,也就是 新流的订阅者对象 。
上面在 213 行
触发转换函数,得到 T
类型的 outputEvent
对象,然后会在 218 行
通过 sink
将新元素添加到新流。可以形象地通过下面图进行理解,想象一下上下有两个水管,椭圆是一滴水,代表流中的一个元素。它会通过 _transform
方法转化为另一滴水,这滴水可能是其他类型。此时的 sink
就像是输出流管道的入口,转换之后的水滴将从这个入口流进管道。
另外,订阅者对象 如何通过添加元素在 《深入剖析 Stream 机制源码实现》 中已经介绍过了,这里就不赘述了。到这里,其实流转化的原理就介绍完了。总的来看并不复杂,就是创建一个新流,监听输入流,再触发转换方法,将新元素加入新流而已。
3. Stream#tack 的源码实现
同样 Stream#tack
方法也是类似,返回的是新流 _TakeStream
。 它也是继承自 _ForwardingStream
,在构造函数中会通过父类构造初始化 _stream 输入流
, 另外它自己会持有 _count
成员,用于表示期望取得元素的个数。
Stream<T> take(int count) { return new _TakeStream<T>(this, count); }
在上一篇说过,take
方法会在取得期望个数的元素之后,停止时间线。也就是说,它具有中断订阅的功能,和 map
有所不同,这也是为什么要单独拿出来分析源码的原因。
从源码中可以看出,它覆写了 _createSubscription
方法,其中如果 _count = 0
,会直接返回 _DoneStreamSubscription
完成订阅。否则,返回的是 _StateStreamSubscription
,其中 _count
会作为构造入参:
其下 _StateStreamSubscription
继承自 _ForwardingStreamSubscription
,额外维护 _subState
成员。该成员从运行结果上来看,将用于控制订阅的完成,等下就能看到它的作用。
我们已经知道,当输入流的元素激活时,会触发 _handleData
方法。其中的 sink
就是新流的订阅者,从 tag1
出可以看到 _subState
的作用,就是 take
中传入的个数。每当触发 _handleData
时,元素就会通过 sink
加入新流中,然后就会 _subState 会 -1
。从 tag2
出可以看出,当个数为 0
时,就会关闭,这就是 take
可以在满足条件户中断监听的本质原因。
---->[_TakeStream#_handleData]--- void _handleData(T inputEvent, _EventSink<T> sink) { var subscription = sink as _StateStreamSubscription<int, T>; int count = subscription._subState; //tag1 if (count > 0) { sink._add(inputEvent); count -= 1; subscription._subState = count; if (count == 0) { sink._close(); // tag2 } } }
到这里,输入流转化为输出流的两种场景,就从源码的角度介绍完毕了。总的来看并不是非常复杂,想象成两个管道中水流的变化,就非常容易理解。
4. Stream#reduce 的源码实现
reduce
方法源码实现其实非常简单,它最终返回的是一个元素,也就是说它会将所有元素按某个规则进行整合。下面仍通过水管流水的场景,来说明一下 reduce
的含义。比如现在有一个 1~6
逐渐激活的流,水会流进一个杯子里。其中杯子里的东西叫做 previous
, 当一个元素流出时,它叫做 element
。
杯子可以指定对 previous
和 element
的操作函数,比如这里是加法。那第一滴水落入杯子后, previous
就是 1+2=3
。这样下一滴水落入杯子,是 3+3=6
,之后同理。可以想象一下,水滴逐步流到杯子里的场景,当水流尽后,杯子里的值就是最终返回的结果数据。
如下所示,reduce
的源码并没有引入其他的类,直接监听自身,获取 StreamSubscription
对象,监听 onData
的触发,当非首位元素,会进行 combine
操作。这里的 _runUserCode
方法,可能有些朋友看作比较晕:
这个方法很简单,两个函数入参,第一个先触发,作为第二个函数的回调值。对于上面而言,就约等于 value = combine(value, element)
; 这里使用 _runUserCode
主要是为了抓取异常。
二、防抖 debounce 与节流 throttle
当短时间内元素激发的频率过高,当没有必要对其中的每个元素进行处理时。如何在一定时间内 滤除激活元素 ,就非常重要。最常见的是 防抖 debounce
和 节流 throttle
的处理。
1. 防抖或节流的意义
比如下面是拖拽手势触发的情况,每次触发时激活一个元素。每次激活在界面上绘制一个条纹,可以看出非常频繁。但有时并不需要这么频繁的响应,比如通过手势进行绘图时。
另外在搜索的场景,也没有必要在每次字符变化时,都请求接口查询数据。这样会造成很多不必要的访问,不仅会额外消耗流量,更会对服务器造成负担。比如用户在输入 Container
的过程中会触发 9
次字符变化,在输入过程中一般都不是用户的期望目标,会造成 8
次的额外请求。像文本自动保存这类可能频繁触发的场景,对激活元素在时间段内的限制是非常有必要的。
可能很多人会把 防抖
和 节流
搞混,其实他们之间的差异还是非常大的。通过下面的案例可以非常清楚地说明两者之间的差异:其中 throttle
和 debounce
都是 250 ms
阈值。(下图是用 Flutter 绘制的哦)
如果想要 固定间隔时间 响应激活元素,可以使用 节流 throttle
变换,在时间段之内的元素都被忽略。在滑动过程中可以看出 throttle
是间隔性触发的,会在满足阈值之后多次触发。
而对于 debounce
而言,前后元素的时间间隔差大于阈值才会触发一次。也就是说当你一直拖拽,前后元素响应时间戳都会很短,是不会触发 debounce
的。上面动图中,停顿了一下,前后间隔超过阈值,所以中间会触发一次。
2. 防抖或节流的使用
这里模拟一个场景,输入 Comtainer
字符串,每 100ms
输入一个字符:
class StreamProvider{ Stream<String> createStream() async*{ String input = "Container"; for(int i = 0 ; i < input.length; i++){ yield input.substring(0,i+1); await Future.delayed(const Duration(milliseconds: 100)); } } }
先看一下原流的情况,打印日志如下:
``` void foo1(){ Stream inStream = StreamProvider().createStream(); int start = DateTime.now().millisecondsSinceEpoch;
inStream.listen((String event) { int time = DateTime.now().millisecondsSinceEpoch - start; print('====${time} ms===输入:$event======'); }); } ```
下面使用 stream_transform
三方库中对 Stream
的拓展,来实现防抖和节流的 Stream 转换。
先看一下 节流 throttle
, 如下所示在 250 ms
的限流时间下,会忽略期间的激活元素。这样相当于在搜索过程中,会响应 3
次,可以在 搜索显示联想词 的场景下使用。
void foo2(){ Stream<String> inStream = StreamProvider().createStream(); int start = DateTime.now().millisecondsSinceEpoch; Stream<String> newStream = inStream.throttle(const Duration(milliseconds: 250)); newStream.listen((String event) { int time = DateTime.now().millisecondsSinceEpoch - start; print('====${time} ms===输入:$event======'); }); }
然后看一下 防抖 debounce
, 如下所示在 250 ms
的限流时间下。这样相当于在搜索过程中,每 100 ms
输入一个字符,由于前后元素的激活时间一种在 250 ms
之内,所以最终只会激活最后的元素。大家可以品味一下两者之间的差异。
void foo2(){ Stream<String> inStream = StreamProvider().createStream(); int start = DateTime.now().millisecondsSinceEpoch; Stream<String> newStream = inStream.debounce(const Duration(milliseconds: 250)); newStream.listen((String event) { int time = DateTime.now().millisecondsSinceEpoch - start; print('====${time} ms===输入:$event======'); }); }
搜索显示联想词 使用 debounce
也是可以的,比如用户在输入 Con
的时候停顿一下,超过 250 ms
,就会激活元素,相比而言 debounce
还更适合,它的过滤能力更强。 throttle
更适合那些需要在固定时下激活的场景,比如画板中,每个 50ms
记录一个更新点,这时 debounce
很明显不合适,因为在连续密集触发的情况下, debounce
并不会触发。
由于流的可以转换的性质,在使用 flutter_bloc
做状态管理时,就可以利用这种天然优势,一个转换方法,就能很轻松地做到防抖节流的效果,这也是我为什么非常喜欢 bloc
的理由。
三、 拖动防抖节流案例的实现
可能有很多朋友对这个小例子比较感兴趣,这里简单介绍一下实现方式:案例代码详见 async_demo
1. 线条的绘制
首先,毫无疑问,这时通过 Canvas
绘制的。其依赖的数据是 时间差列表 List<int>
, 从开始拖拽开始起,每次触发事件激活一次元素,此时的时间差就会决定线条的偏移量:
绘制逻辑非常简单,就是根据 List<int>
数据画线而已:
``` class Painter extends CustomPainter{ final List data;
Painter(this.data);
@override void paint(Canvas canvas, Size size) { Paint paint = Paint()..strokeWidth=1; for(int i=0;i
@override bool shouldRepaint(covariant Painter oldDelegate) => data!=oldDelegate.data;
} ```
2. 流的运转
这里的重点在于数据的收集工作,拿原流来说,流中的元素类型是 Signal
,一个什么都没有的对象。它只作为信号,通知流中元素的激活情况:
class Signal{}
在状态类中维护流控制器,用于向流中添加 Signal
元素。另外维护三个 List<int>
对象,作为 Painter
画板的数据;recordTime
用于记录拖拽开始的时间戳:
``` StreamController controller = StreamController(); late Stream source; List touchCollector = []; List touchCollectorT = []; List touchCollectorD = [];
@override void initState() { super.initState(); source = controller.stream.asBroadcastStream(); }
int recordTime = 0;
```
然后通过 GestureDetector
监听拖拽事件,在更新时发送 Signal
。这样数据流就可以在拖拽的过程中,动起来
了。
``` void _onUpdate(DragUpdateDetails details) { controller.add(Signal()); }
void _start(DragStartDetails details) { recordTime = DateTime.now().millisecondsSinceEpoch; touchCollector = []; touchCollectorT = []; touchCollectorD = []; } ```
3. 界面的构建
既然已经有了 Stream
,那么视图的构建自然是用 StreamBuilder
来实现。如下所示,当流中元素激活时,会触发 builder
方法,此时为 touchCollector
添加元素即可。这样,随着拖拽的进行,不断激活信号,原流中的数据将会越来越多,就能达到如下密集线条的效果:
StreamBuilder<Signal>( stream: source, builder: (_, snap) { if (snap.data != null) { int now = DateTime.now().millisecondsSinceEpoch; touchCollector = List.of(touchCollector)..add(now - recordTime); } return SizedBox( height: 120, child: CustomPaint( painter: Painter(touchCollector), ), ); }, ),
对应 throttle
而言,只要把原流通过 throttle
转换即可,这样拖拽的事件就会以指定间隔时间触发。从而避免密集的频繁响应。另外 debounce
的处理也是类似,就不赘述了。
StreamBuilder<Signal>( stream: source.throttle(const Duration(milliseconds: 250)), builder: (_, snap) { if (snap.data != null) { int now = DateTime.now().millisecondsSinceEpoch; touchCollectorT = List.of(touchCollectorT) ..add(now - recordTime); } return SizedBox( height: 120, child: CustomPaint( painter: Painter(touchCollectorT), ), ); }, ),
结语
到这里,Flutter 异步的相关知识基本上就介绍完毕,欢迎关注 《Flutter 知识进阶 - 异步编程》 专栏。 也感谢掘金平台给作者们创作 签约文章
的机会,一方面可以 免费 为读者提供更精良的文章,另一方面作者能得到平台提供的稿费。可谓两全其美,如果没有这个机会,可能我并不会去深入认识 Future
和 Stream
的源码,在创作的过程中,在源码的探索中,我也收获了很多知识。
这十篇文章,系统介绍了 Flutter
中异步的概念,探索 Future
、Stream
的使用和源码实现,以及消息处理机制、微任务循环。这是作为 Flutter
进阶必不可少的一个阶段,希望对大家的继续前行有所帮助。那本专题就到这里,有缘再见 ~
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)