f69dbd7e7483fdf629a787152051799b.png

本文来自于2019年10月15日-17日荷兰首都阿姆斯特丹举行的 SPARK + AI SUMMIT Europe 2019 会议,议题名为《Near Real Time Data Warehousing with Apache Spark and Delta Lake》,分享者 Jasper Groot。

今年数砖开源的重量级项目 Delta Lake(重磅 | Apache Spark 社区期待的 Delta Lake 开源了)已经被越来越多的公司使用,官方一直宣传 Delta Lake & Structed Streaming 是最好的搭配,本文正是介绍如何使用 Delta Lake 和 Structed Streaming 构建近实时数仓。

本文 PPT 请关注过往记忆大数据微信公众号,并回复 data_warehouse 关键字获取。本分享配套视频:

另外,本次会议的全部可下载的 PPT 和视频请访问Spark+AI Summit Europe 2019 超清视频&PPT下载。好了,我们进入正文吧。

4cbc992988a011a7cc6db10b08c6d053.png

本分享主要包括三部分

  • Structed Streaming
  • Delta Lake
  • 数据仓库

5d3c268426312dc73eac3628b84647e1.png

Structed Streaming 从 Spark 2.0 开始引入,其 API 和 DataFrame 的 API 类似。

fb7d29ea797ba40d0c25e50a5b8368dc.png

f1a0d4678f7379f41260691efb1c4646.png

739f8cc2a949682701861094fbf62397.png

b9357b9c0af6243bbbc4006dcd75e26c.png

5d3c268426312dc73eac3628b84647e1.png

4955d21003a834aaebd90ff26bb28569.png

上面都是 Structed Streaming 的基本介绍,详细可以参见 https://www.iteblog.com/archives/2084.html。下面我们来简要介绍 Delta Lake

d69ac414caf6d12ee70da04af36e4ad0.png

2e4a77b300eb754887a6c3c6f2848e48.png

c8eb6759035c3cdcd7a20e0c186aa16d.png

4eabfe3ac1f278431f7512e0b3a93ac5.png

00ad23121273a329269b02b95b5ed32f.png

5afdb8b4704d44cea64e8afa8e15158c.png

98aa2254ca04c96d949f5f75dce445e4.png

99c9d14478f91c7b2246e21f18c20b8f.png

24f2ed6b5f5592c42f82eaa5d5db9bec.png

f497dcbd91823fa38a86fd90ca653a96.png

5d3c268426312dc73eac3628b84647e1.png

26e6341b2e84e1fb6033b9f15409b25c.png

f0a37fca970fc818ecc68e3c9ca59067.png

a92c3524ff8c65210ed13eb636106d28.png

9f91a6209372836810e7ad5722ddc20a.png

前面都是关于 Delta Lake 的 ACID 是如何实现的,这个依赖于事务日志,关于这个详细的介绍可以参见过往记忆大数据的 深入理解 Apache Spark Delta Lake 的事务日志 这篇文章进行了解。

80b77edce6ad3109f8bb20ccfb68b2c0.png

随着时间的推移,磁盘中会存在大量的事务日志,Delta Lake 提供了 VACUUM 来清理过期的事务日志,默认只保存7天。VACUUM 命令会有短暂的停留,会对写有些影响。不像 update、delete、insert 等操作,VACUUM 是不记事务日志的。

下面我们来看看如何使用 VACUUM 命令:

5d3c268426312dc73eac3628b84647e1.png

到这里我们已经简要的介绍了 Structed Streaming 和 Delta Lake 是什么。下面我们来看看将这两者结合起来如何实现近实时数据仓库。

eb9351011957a4de775b50b0e1a35a35.png

7409184dff5d0c6ae95bbcc56dde864c.png

我们把 Structed Streaming 和 Delta Lake 放在一起,利用 Structed Streaming 的 DataFrame API、很好地处理迟到的数据以及可以和很多实时流数据源进行 Join。利用 Delta Lake 的 ACID 事务、事务日志以及相关文件管理,来构建数据仓库。

其实数据仓库的构建有多重方式,这里我只介绍我们采用的:星型模型、原始数据在 MySQL 中,目标数据存储在 S3中。

4fe339b86485cfb582e25e6efeb13310.png

下面是我们近实时数据仓库的架构图,分为两条流:

  • 批处理,这个使用 Spark batch 能力,直接读取 MySQL 里面的数据,然后写到 Delta Lake 数据库中;
  • 近实时处理,这个获取 MySQL 的 binlog,然后写入到 Kafka,接着使用 Structed Streaming 处理 binlog 数据最终写入到 Delta Lake 中。

写入到 Delta Lake 的数据可以用于 Adhoc 查询分析,或者使用 Spark 处理再将结果导出到 MySQL 中。

5926648689635860654fed64dff08c61.png

下面是我们表的模式以及关联关系:

73b328ea59a8d6954476732882a51291.png

CDC Capture 是 Changed Data Capture 的简称,也就是改变数据捕获,这里我们是读取 MySQL 的 Binlog 实现的,使用这个来获取表的更改操作,并写入 Kafka。比如下面我们获取到一条插入的数据。

ddbbc20a9979402acdd5ff06140edd60.png

数据到了 Kafka 之后,我们使用 Structed Streaming 读取 Kafka 的实时数据,并解析,然后写到 Delta Lake 中。

bbe8ee8d1206b911cee06715887964d3.png

0cbacc8658905ba409a3ea87cd4d10af.png

下面就是我们写到 Delta Lake 表中的数据

0d5362d7b9826e2e8e2f2f05beac3588.png

虽然上面的流程能够满足我们的业务需求,但是以下几个问题我们必须警惕:

b5477c8378070fe706a9e49b571861b8.png
  1. 实时流的计算触发(trigger)间隔比较短,这就意味着对应的 Delta Lake 表存在很多小文件,从而影响读的效率;
  2. 表的历史数据如何使用;
  3. 文件大小优化。

5d5cb2db1d444f2d4d5f363e8846d27e.png
  1. 对于 stream-to-stream joins 我们必须使用 Watermarks;
  2. 必须留意实时流数据的延迟
  3. 如果确实有数据出现延迟,而且使用 Watermarks 已经不能解决时,我们必须设置好实时流错误的触发条件。
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐