Airbyte分享
Airbyte,一家专注于 ELT 管道的开源数据集成平台,Airbyte 最主要的产品还是 Extract 数据抽取和 Load 数据加载产品。简单来说,就是利用连接器 (Connector) 连通多平台间的数据,其逻辑是平台连接的数据源越多,平台越稳定,而平台就会拥有壁垒。其次,Airbyte 也提供 Transform (数据转换) 产品,实际上 Transform 数据转换产品 Airby
一、Airbyte?
Airbyte公司基本情况
-
成立时间:2020年年中
-
创始人:Michel Tricot & John Lafleur
-
总部:公司总部位于旧金山
-
融资规模:超过1.5亿美元
-
融资轮次:B系列融资
-
最新估值:总资金为1.812亿美元,估值为15亿美元
-
投资机构:Thrive Capital、Salesforce Ventures、Benchmark、Accel和SV Angel等
Airbyte是做什么的?
-
简单来讲,airbyte是做数据集成和连接的。将应用程序、API和数据库中的数据同步到数据仓库、数据湖和其他目的地
-
支持200个Source类型连接器,100 个Destination类型的连接器
-
2021年,9000多家公司使用Airbyte从PostgreSQL、Oracle、MySQL、Facebook广告、Salesforce、Stripe等来源同步数据,并连接到Redshift、Snowflake、Databricks和BigQuery等目的地
-
社区:拥有4500名数据从业者和200名贡献者
-
预计到2022年底将有500个高质量连接器且涵盖更多类型的数据移动,包括反向ETL和流式接收
Airbyte解决了什么问题?
-
第一:公司总是必须自己构建和维护数据连接器,因为大多数不太流行的“长尾”数据连接器不受封闭源ELT技术的支持。
-
第二:数据团队通常必须围绕预建连接器进行定制工作,以使其在其独特的数据基础架构中工作。
二、整体架构
Airbyte一些核心概念
-
Airbyte Connector——连接器
-
在Airbyte的概念中,connector或者是收集从数据源推送过来的数据,或者去跟数据源发送请求去抓取数据。
-
Airbyte规定每个connector都放在一个完整的docker镜像中
-
Airbyte的connector的类型如下图:
-
-
Data Types——数据类型
-
实际上内部转换过程中只有:String/Number两种数据类型
column ab_name (type TEXT[65535]) -> JsonSchemaType({type=string}) column ab_date (type TEXT[65535]) -> JsonSchemaType({type=string}) column ab_time (type TEXT[65535]) -> JsonSchemaType({type=string}) column ab_datetime (type TEXT[65535]) -> JsonSchemaType({type=string}) column ab_salary (type DOUBLE[22]) -> JsonSchemaType({type=number})
-
Airbyte整体架构
框架总体介绍:
-
ui :进行配置,以及通过界面查看同步日志
-
config store :关于认证&&同步信息存储;存储在数据库中,通过jooq(一个对象关系映射的轻量级框架,在sql执行方面的灵活性和对数据包装的严谨性方面都很优秀)存取操作
-
scheduler store: 存储关于调度执行情况;同jooq
-
config api :方便ui 进行连接配置
-
scheduler api :进行调度job 配置
-
scheduler :进行数据任务的调度编排以及状态追踪
-
worker :具体数据从source 到sink 负责将数据从源容器移动到目标容器。
-
管理此过程的整个生命周期。这包括:
-
启动源和目标容器
-
将数据从源传递到目标,在源和目标之间执行任何配置的仅映射操作(映射器)
-
收集关于从源传递到目标的数据的元数据
-
侦听从目标发出的状态消息,以跟踪所发生的情况数据已复制
-
处理源和目标的关闭
-
处理故障案例并返回部分完成复制的状态(以便下一个复制可以从停止的地方开始,而不是从开始处开始)
-
-
-
tep store 临时存储(需要数据写入磁盘的场景)
数据同步的整体流程
MySQL2MySQL
大致流程:
- UI界面配置connector,connector信息入库
- 客户端发起同步请求
- 客户端将请求转发到后端,调度封装环境信息、加载作业配置信息、提交同步作业到Worker
- 启动worker,另起两个线程,分别启动source和destination容器,将作业json内容分发到容器中
- worker启动同步线程,执行同步
-
为每个流创建一个临时表
-
在缓冲区中累积记录。每个流一个缓冲区。
-
随着记录的积累,将其批量刷新数据库。
-
将所有记录写入缓冲区后,刷新缓冲区并将所有剩余记录写入数据库(无论剩下多少记录)
-
当记录被“刷新”时,它将从docker容器移动到目标端
-
按照惯例,这些被刷新的记录通常被放置在destination的某种临时仓库中(例如临时数据库或文件存储)
-
在最后close中,处理提交:这时候表示数据到最终存储(例如最终表)。也就是flush将数据推入云存储中的暂存区域,然后从临时表复制到最终表,然后关闭暂存副本
-
-
-
在单个事务中,删除目标表(如果存在),并将临时表重命名为最终表名。
-
健康状态上报,同步结果信息上报
String.format("SELECT %s FROM %s", enquoteIdentifierList(columnNames), getFullTableName(schemaName, tableName)));
connector配置信息
{
"connectionId": "c3688b34-6704-438c-b2e2-4f557e248b29",
"name": "mysql2mysql_one",
"namespaceDefinition": "destination",
"namespaceFormat": "${SOURCE_NAMESPACE}",
"prefix": "cool",
"sourceId": "75bdbcc3-3650-4c81-bc21-9405ba7d2388",
"destinationId": "42005a16-cb12-4f0b-a3ae-022ce93b9788",
"syncCatalog": {
"streams": [{
"stream": {
"name": "employee",
"jsonSchema": {
"type": "object",
"properties": {
"ab_id": {
"type": "number"
},
"ab_date": {
"type": "string"
},
"ab_name": {
"type": "string"
},
"ab_time": {
"type": "string"
},
"ab_salary": {
"type": "number"
},
"ab_datetime": {
"type": "string"
}
}
},
"supportedSyncModes": ["full_refresh", "incremental"],
"defaultCursorField": [],
"sourceDefinedPrimaryKey": [],
"namespace": "airbyte_source"
},
"config": {
"syncMode": "full_refresh",
"cursorField": [],
"destinationSyncMode": "overwrite",
"primaryKey": [],
"aliasName": "employee",
"selected": true
}
}]
},
"status": "active",
"operationIds": ["806978ac-f2ab-44c6-b358-6daedeac38cc"],
"source": {
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
"sourceId": "75bdbcc3-3650-4c81-bc21-9405ba7d2388",
"workspaceId": "1bd5b9e2-f747-49e8-a4cb-042d2b278557",
"connectionConfiguration": {
"ssl": true,
"host": "localhost",
"port": 3308,
"database": "airbyte_source",
"password": "**********",
"username": "root",
"tunnel_method": {
"tunnel_method": "NO_TUNNEL"
},
"replication_method": "STANDARD"
},
"name": "mysql_source_one",
"sourceName": "MySQL"
},
"destination": {
"destinationDefinitionId": "ca81ee7c-3163-4246-af40-094cc31e5e42",
"destinationId": "42005a16-cb12-4f0b-a3ae-022ce93b9788",
"workspaceId": "1bd5b9e2-f747-49e8-a4cb-042d2b278557",
"connectionConfiguration": {
"ssl": true,
"host": "localhost",
"port": 3308,
"database": "airbyte_destination",
"password": "**********",
"username": "root",
"tunnel_method": {
"tunnel_method": "NO_TUNNEL"
}
},
"name": "mysql_destination_one",
"destinationName": "MySQL"
},
"operations": [{
"workspaceId": "1bd5b9e2-f747-49e8-a4cb-042d2b278557",
"operationId": "806978ac-f2ab-44c6-b358-6daedeac38cc",
"name": "Normalization",
"operatorConfiguration": {
"operatorType": "normalization",
"normalization": {
"option": "basic"
}
}
}],
"latestSyncJobCreatedAt": 1657021788,
"latestSyncJobStatus": "succeeded",
"isSyncing": false,
"catalogId": "9074e5a7-dcab-4ba0-956c-891f6b118486"
}
三、流程演示
配置同步作业
- 1、从Airbyte支持的“Sources”中选择想要连接的数据源,并配置相应信息
-
2、从Airbyte支持的“Destinations”中选择数据
-
3、刷新数据源schema
-
4、数据同步
-
5、结果展示
-
6、10w数据同步测试
四、ETL-->EL
Airbyte 集成了 dbt,到底哪里集成的。
-
Airbyte的理念是主要做Extract (Source) + Load (Destination),所以,中间的逻辑要尽可能简单。当写入目标后,其实是更原始的格式,比如上面的结果展示
-
数据是较“原汁原味”的保存到了数仓中,虽然是以JSON列的形式保存的。要分析其数据,一般还需要调用类似 get_json_object 等函数来把这些JSON转为更多列。
-
为了简化这个步骤,对于一些常见数仓,airbyte支持“Basic Normalization”的操作,这步骤就是借助了dbt来为这些数仓做转化。
-
当选择的是Raw data(JSON)模式,那么数据入库就是以JSON的形式存在于库中
source
destination:
-
当选择的是Normalized tabular data模式,那在Sync好数据后,会调用dbt来做数据转
-
dbt相关的代码可以看: https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/bases/base-normalization/dbt-project-template/macros 目录
source:
destination:
- 由结果展示可知,Airbyte会改变数据库表的schema,同步过程中,新增自定义的三个字段在destination中:
驱动加载
-
MySQL作为例子,从源码角度看,JDBC驱动加载连接器访问关系数据库;支持JDBC驱动的数据库如下
CLICKHOUSE("ru.yandex.clickhouse.ClickHouseDriver", "jdbc:clickhouse://%s:%d/%s"),
DATABRICKS("com.databricks.client.jdbc.Driver","jdbc:databricks://%s;HttpPath=%s;UserAgentEntry=Airbyte"),
DB2("com.ibm.db2.jcc.DB2Driver", "jdbc:db2://%s:%d/%s"),
MARIADB("org.mariadb.jdbc.Driver", "jdbc:mariadb://%s:%d/%s"),
MSSQLSERVER("com.microsoft.sqlserver.jdbc.SQLServerDriver", "jdbc:sqlserver://%s:%d/%s"),
MYSQL("com.mysql.cj.jdbc.Driver", "jdbc:mysql://%s:%d/%s"),
ORACLE("oracle.jdbc.OracleDriver", "jdbc:oracle:thin:@%s:%d/%s"),
POSTGRESQL("org.postgresql.Driver", "jdbc:postgresql://%s:%d/%s"),
REDSHIFT("com.amazon.redshift.jdbc.Driver", "jdbc:redshift://%s:%d/%s"),
SNOWFLAKE("net.snowflake.client.jdbc.SnowflakeDriver", "jdbc:snowflake://%s/");
同步模式
-
完全刷新覆盖:同步整个流并通过覆盖替换目标中的数据。
-
覆盖:通过首先删除目标中的现有数据进行覆盖。
-
-
完全刷新追加:同步整个流并在目标中追加数据。
-
追加:通过向目标中的现有表添加数据进行写入
-
-
增量追加:同步流中的新记录并在目标中追加数据。
-
增量重复数据消除历史记录:从流中同步新记录并在目标中追加数据,还提供了一个重复数据消除视图,镜像源中流的状态。
五、参考资料
-
https://github.com/airbytehq/airbyte
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)