一、Airbyte?

Airbyte公司基本情况

  • 成立时间:2020年年中

  • 创始人:Michel Tricot & John Lafleur

  • 总部:公司总部位于旧金山

  • 融资规模:超过1.5亿美元

  • 融资轮次:B系列融资

  • 最新估值:总资金为1.812亿美元,估值为15亿美元

  • 投资机构:Thrive Capital、Salesforce Ventures、Benchmark、Accel和SV Angel等

Airbyte是做什么的?

  • 简单来讲,airbyte是做数据集成和连接的。将应用程序、API和数据库中的数据同步到数据仓库、数据湖和其他目的地

  • 支持200个Source类型连接器,100 个Destination类型的连接器

  • 2021年,9000多家公司使用Airbyte从PostgreSQL、Oracle、MySQL、Facebook广告、Salesforce、Stripe等来源同步数据,并连接到Redshift、Snowflake、Databricks和BigQuery等目的地

  • 社区:拥有4500名数据从业者和200名贡献者

  • 预计到2022年底将有500个高质量连接器且涵盖更多类型的数据移动,包括反向ETL和流式接收

Airbyte解决了什么问题?

  • 第一:公司总是必须自己构建和维护数据连接器,因为大多数不太流行的“长尾”数据连接器不受封闭源ELT技术的支持。

  • 第二:数据团队通常必须围绕预建连接器进行定制工作,以使其在其独特的数据基础架构中工作。

二、整体架构

Airbyte一些核心概念

  • Airbyte Connector——连接器

    • 在Airbyte的概念中,connector或者是收集从数据源推送过来的数据,或者去跟数据源发送请求去抓取数据。

    • Airbyte规定每个connector都放在一个完整的docker镜像中

    • Airbyte的connector的类型如下图:

  • Data Types——数据类型

    • 实际上内部转换过程中只有:String/Number两种数据类型

      column ab_name (type TEXT[65535]) -> JsonSchemaType({type=string}) 
      column ab_date (type TEXT[65535]) -> JsonSchemaType({type=string}) 
      column ab_time (type TEXT[65535]) -> JsonSchemaType({type=string}) 
      column ab_datetime (type TEXT[65535]) -> JsonSchemaType({type=string}) 
      column ab_salary (type DOUBLE[22]) -> JsonSchemaType({type=number})

Airbyte整体架构

框架总体介绍:

  • ui :进行配置,以及通过界面查看同步日志

  • config store :关于认证&&同步信息存储;存储在数据库中,通过jooq(一个对象关系映射的轻量级框架,在sql执行方面的灵活性和对数据包装的严谨性方面都很优秀)存取操作

  • scheduler store: 存储关于调度执行情况;同jooq

  • config api :方便ui 进行连接配置

  • scheduler api :进行调度job 配置

  • scheduler :进行数据任务的调度编排以及状态追踪

  • worker :具体数据从source 到sink 负责将数据从源容器移动到目标容器。

    • 管理此过程的整个生命周期。这包括:

      • 启动源和目标容器

      • 将数据从源传递到目标,在源和目标之间执行任何配置的仅映射操作(映射器)

      • 收集关于从源传递到目标的数据的元数据

      • 侦听从目标发出的状态消息,以跟踪所发生的情况数据已复制

      • 处理源和目标的关闭

      • 处理故障案例并返回部分完成复制的状态(以便下一个复制可以从停止的地方开始,而不是从开始处开始)

  • tep store 临时存储(需要数据写入磁盘的场景)

数据同步的整体流程

MySQL2MySQL

大致流程:

  1. UI界面配置connector,connector信息入库
  2. 客户端发起同步请求
  3. 客户端将请求转发到后端,调度封装环境信息、加载作业配置信息、提交同步作业到Worker
  4. 启动worker,另起两个线程,分别启动source和destination容器,将作业json内容分发到容器中
  5. worker启动同步线程,执行同步
    1. 为每个流创建一个临时表

    2. 在缓冲区中累积记录。每个流一个缓冲区。

    3. 随着记录的积累,将其批量刷新数据库。

    4. 将所有记录写入缓冲区后,刷新缓冲区并将所有剩余记录写入数据库(无论剩下多少记录)

      1. 当记录被“刷新”时,它将从docker容器移动到目标端

      2. 按照惯例,这些被刷新的记录通常被放置在destination的某种临时仓库中(例如临时数据库或文件存储)

      3. 在最后close中,处理提交:这时候表示数据到最终存储(例如最终表)。也就是flush将数据推入云存储中的暂存区域,然后从临时表复制到最终表,然后关闭暂存副本

  6. 在单个事务中,删除目标表(如果存在),并将临时表重命名为最终表名。

  7. 健康状态上报,同步结果信息上报

String.format("SELECT %s FROM %s", enquoteIdentifierList(columnNames), getFullTableName(schemaName, tableName)));

connector配置信息

{
        "connectionId": "c3688b34-6704-438c-b2e2-4f557e248b29",
        "name": "mysql2mysql_one",
        "namespaceDefinition": "destination",
        "namespaceFormat": "${SOURCE_NAMESPACE}",
        "prefix": "cool",
        "sourceId": "75bdbcc3-3650-4c81-bc21-9405ba7d2388",
        "destinationId": "42005a16-cb12-4f0b-a3ae-022ce93b9788",
        "syncCatalog": {
                "streams": [{
                        "stream": {
                                "name": "employee",
                                "jsonSchema": {
                                        "type": "object",
                                        "properties": {
                                                "ab_id": {
                                                        "type": "number"
                                                },
                                                "ab_date": {
                                                        "type": "string"
                                                },
                                                "ab_name": {
                                                        "type": "string"
                                                },
                                                "ab_time": {
                                                        "type": "string"
                                                },
                                                "ab_salary": {
                                                        "type": "number"
                                                },
                                                "ab_datetime": {
                                                        "type": "string"
                                                }
                                        }
                                },
                                "supportedSyncModes": ["full_refresh", "incremental"],
                                "defaultCursorField": [],
                                "sourceDefinedPrimaryKey": [],
                                "namespace": "airbyte_source"
                        },
                        "config": {
                                "syncMode": "full_refresh",
                                "cursorField": [],
                                "destinationSyncMode": "overwrite",
                                "primaryKey": [],
                                "aliasName": "employee",
                                "selected": true
                        }
                }]
        },
        "status": "active",
        "operationIds": ["806978ac-f2ab-44c6-b358-6daedeac38cc"],
        "source": {
                "sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
                "sourceId": "75bdbcc3-3650-4c81-bc21-9405ba7d2388",
                "workspaceId": "1bd5b9e2-f747-49e8-a4cb-042d2b278557",
                "connectionConfiguration": {
                        "ssl": true,
                        "host": "localhost",
                        "port": 3308,
                        "database": "airbyte_source",
                        "password": "**********",
                        "username": "root",
                        "tunnel_method": {
                                "tunnel_method": "NO_TUNNEL"
                        },
                        "replication_method": "STANDARD"
                },
                "name": "mysql_source_one",
                "sourceName": "MySQL"
        },
        "destination": {
                "destinationDefinitionId": "ca81ee7c-3163-4246-af40-094cc31e5e42",
                "destinationId": "42005a16-cb12-4f0b-a3ae-022ce93b9788",
                "workspaceId": "1bd5b9e2-f747-49e8-a4cb-042d2b278557",
                "connectionConfiguration": {
                        "ssl": true,
                        "host": "localhost",
                        "port": 3308,
                        "database": "airbyte_destination",
                        "password": "**********",
                        "username": "root",
                        "tunnel_method": {
                                "tunnel_method": "NO_TUNNEL"
                        }
                },
                "name": "mysql_destination_one",
                "destinationName": "MySQL"
        },
        "operations": [{
                "workspaceId": "1bd5b9e2-f747-49e8-a4cb-042d2b278557",
                "operationId": "806978ac-f2ab-44c6-b358-6daedeac38cc",
                "name": "Normalization",
                "operatorConfiguration": {
                        "operatorType": "normalization",
                        "normalization": {
                                "option": "basic"
                        }
                }
        }],
        "latestSyncJobCreatedAt": 1657021788,
        "latestSyncJobStatus": "succeeded",
        "isSyncing": false,
        "catalogId": "9074e5a7-dcab-4ba0-956c-891f6b118486"
}

三、流程演示

配置同步作业

  • 1、从Airbyte支持的“Sources”中选择想要连接的数据源,并配置相应信息 

  • 2、从Airbyte支持的“Destinations”中选择数据

  • 3、刷新数据源schema

  • 4、数据同步

  • 5、结果展示
     

  • 6、10w数据同步测试

四、ETL-->EL

Airbyte 集成了 dbt,到底哪里集成的。

  • Airbyte的理念是主要做Extract (Source) + Load (Destination),所以,中间的逻辑要尽可能简单。当写入目标后,其实是更原始的格式,比如上面的结果展示

  • 数据是较“原汁原味”的保存到了数仓中,虽然是以JSON列的形式保存的。要分析其数据,一般还需要调用类似 get_json_object 等函数来把这些JSON转为更多列。

  • 为了简化这个步骤,对于一些常见数仓,airbyte支持“Basic Normalization”的操作,这步骤就是借助了dbt来为这些数仓做转化。

  • 当选择的是Raw data(JSON)模式,那么数据入库就是以JSON的形式存在于库中

source

destination:

source:

destination:

  •  由结果展示可知,Airbyte会改变数据库表的schema,同步过程中,新增自定义的三个字段在destination中:

驱动加载

  • MySQL作为例子,从源码角度看,JDBC驱动加载连接器访问关系数据库;支持JDBC驱动的数据库如下

CLICKHOUSE("ru.yandex.clickhouse.ClickHouseDriver", "jdbc:clickhouse://%s:%d/%s"), 
DATABRICKS("com.databricks.client.jdbc.Driver","jdbc:databricks://%s;HttpPath=%s;UserAgentEntry=Airbyte"),
DB2("com.ibm.db2.jcc.DB2Driver", "jdbc:db2://%s:%d/%s"), 
MARIADB("org.mariadb.jdbc.Driver", "jdbc:mariadb://%s:%d/%s"), 
MSSQLSERVER("com.microsoft.sqlserver.jdbc.SQLServerDriver", "jdbc:sqlserver://%s:%d/%s"), 
MYSQL("com.mysql.cj.jdbc.Driver", "jdbc:mysql://%s:%d/%s"), 
ORACLE("oracle.jdbc.OracleDriver", "jdbc:oracle:thin:@%s:%d/%s"), 
POSTGRESQL("org.postgresql.Driver", "jdbc:postgresql://%s:%d/%s"), 
REDSHIFT("com.amazon.redshift.jdbc.Driver", "jdbc:redshift://%s:%d/%s"), 
SNOWFLAKE("net.snowflake.client.jdbc.SnowflakeDriver", "jdbc:snowflake://%s/");

同步模式

  • 完全刷新覆盖:同步整个流并通过覆盖替换目标中的数据。

    • 覆盖:通过首先删除目标中的现有数据进行覆盖。

  • 完全刷新追加:同步整个流并在目标中追加数据。

    • 追加:通过向目标中的现有表添加数据进行写入

  • 增量追加:同步流中的新记录并在目标中追加数据。

  • 增量重复数据消除历史记录:从流中同步新记录并在目标中追加数据,还提供了一个重复数据消除视图,镜像源中流的状态。

五、参考资料

  • https://github.com/airbytehq/airbyte

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐