4.Doris数据导入导出
该功能可以将用户指定的表或分区的数据,以指定的文件格式,通过 Broker 进程或 S3协议/HDFS协议 导出到远端存储上,如 对象存储 / HDFS。Doris 的每一个导入作业,不论是使用 Broker Load 进行批量导入,还是使用 INSERT 语句进行单条导入,都是一个完整的事务操作。Doris支持创建外部表,创建完成之后可以通过SELECT语句直接查询外部表的数据,也可以通过INS
导入
Doris支持多种数据源导入如S3 HDFS Kafka 本地文件 Binlog 等
官方文档
导入的原子性保证
Doris 的每一个导入作业,不论是使用 Broker Load 进行批量导入,还是使用 INSERT 语句进行单条导入,都是一个完整的事务操作。导入事务可以保证一批次内的数据原子生效,不会出现部分数据写入的情况。
导入方式
导入方式分为同步与异步,同步方式直接返回导入成功或者失败;异步方式返回作业提交成功,不代表导入成功,需要通过对应命令查询作业运行状态
导入本地数据
Doris2.0支持Stream Load 与MySQL Load两种本地导入模式
Stream Load 通过HTTP协议与Doris交互
- 支持CSV 与 JSON格式 1.2+ 支持PARQUET 与 ORC 默认CSV
- BE 的 HTTP 协议端口,默认为 8040。
- FE 的 HTTP 协议端口,默认为 8030。但须保证客户端所在机器网络能够联通 BE 所在机器
PUT /api/{db}/{table}/_stream_load
-- 创建表
CREATE TABLE IF NOT EXISTS load_local_file_test
(
id INT,
age TINYINT,
name VARCHAR(50)
)
unique key(id)
DISTRIBUTED BY HASH(id) BUCKETS 3;
# 使用CURL进行数据导入
curl -u user:passwd -H "label:load_local_file_test" -T /path/to/local/demo.txt http://host:port/api/demo/load_local_file_test/_stream_load
- user:passwd 为在 Doris 中创建的用户。初始用户为 admin / root,密码初始状态下为空
- host:port 为 BE 的 HTTP 协议端口,默认是 8040,可以在 Doris 集群 WEB UI页面查看
- label: 可以在 Header 中指定 Label 唯一标识这个导入任务
{
"TxnId": 1003,
"Label": "load_local_file_test",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 1000000,
"NumberLoadedRows": 1000000,
"NumberFilteredRows": 1,
"NumberUnselectedRows": 0,
"LoadBytes": 40888898,
"LoadTimeMs": 2144,
"BeginTxnTimeMs": 1,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 325,
"WriteDataTimeMs": 1933,
"CommitAndPublishTimeMs": 106,
"ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
}
- 导入结果 Status 状态为Success为成功导入
- Stream Load 只能导入本地文件
- 建议一个导入请求的数据量控制在 1 - 2 GB 以内。如果有大量本地文件,可以分批并发提交
外部存储导入
外部存储导入支持常用的HDFS与符合S3协议的存储介质
以HDFS为例借助Broker进行导入,建议所有BE都部署上Broker
-- 基本语法
LOAD LABEL db_name.label_name
(data_desc, ...)
WITH HDFS
[PROPERTIES (key1=value1, ... )]
-
上传文件到HDFS
-
创建Doris表
CREATE TABLE IF NOT EXISTS load_hdfs_file_test ( id INT, age TINYINT, name VARCHAR(50) ) unique key(id) DISTRIBUTED BY HASH(id) BUCKETS 3;
-
从HDFS中导入数据
LOAD LABEL demo.label_20220402 ( DATA INFILE("hdfs://host:port/tmp/test_hdfs.txt") INTO TABLE `load_hdfs_file_test` COLUMNS TERMINATED BY "\t" (id,age,name) ) with HDFS ( "fs.defaultFS"="hdfs://testFs", "hdfs_user"="user" ) PROPERTIES ( "timeout"="1200", "max_filter_ratio"="0.1" );
-
查看导入状态 SHOW LOAD命令
mysql> show load order by createtime desc limit 1\G; *************************** 1. row *************************** JobId: 41326624 Label: broker_load_2022_04_15 State: FINISHED Progress: ETL:100%; LOAD:100% Type: BROKER EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27 TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1 ErrorMsg: NULL CreateTime: 2022-04-01 18:59:06 EtlStartTime: 2022-04-01 18:59:11 EtlFinishTime: 2022-04-01 18:59:11 LoadStartTime: 2022-04-01 18:59:11 LoadFinishTime: 2022-04-01 18:59:11 URL: NULL JobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[36728051]},"FileNumber":1,"FileSize":5540} 1 row in set (0.01 sec)
订阅kafka日志
订阅kafka日志借助了Doris Routine Load方式
用户首先需要创建一个例行导入作业。作业会通过例行调度,不断地发送一系列的任务,每个任务会消费一定数量 Kafka 中的消息
- 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。
- 支持的消息格式如下:
- csv 文本格式。每一个 message 为一行,且行尾不包含换行符。
- Json 格式,详见 导入 Json 格式数据。
- 仅支持 Kafka 0.10.0.0(含) 以上版本。
-- 语法
CREATE ROUTINE LOAD [db.]job_name [ON tbl_name]
[merge_type]
[load_properties]
[job_properties]
FROM data_source [data_source_properties]
[COMMENT "comment"]
-- 示例
CREATE ROUTINE LOAD demo.my_first_routine_load_job ON test_1
COLUMNS TERMINATED BY ","
PROPERTIES
(
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.group.id" = "xxx",
"property.client.id" = "xxx",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
-- 查看作业运行状态 SHOW ROUTINE LOAD
SHOW [ALL] ROUTINE LOAD [FOR jobName];
Id: 作业ID
Name: 作业名称
CreateTime: 作业创建时间
PauseTime: 最近一次作业暂停时间
EndTime: 作业结束时间
DbName: 对应数据库名称
TableName: 对应表名称 (多表的情况下由于是动态表,因此不显示具体表名,我们统一显示 multi-table )
IsMultiTbl: 是否为多表
State: 作业运行状态
DataSourceType: 数据源类型:KAFKA
CurrentTaskNum: 当前子任务数量
JobProperties: 作业配置详情
DataSourceProperties: 数据源配置详情
CustomProperties: 自定义配置
Statistic: 作业运行状态统计信息
Progress: 作业运行进度
Lag: 作业延迟状态
ReasonOfStateChanged: 作业状态变更的原因
ErrorLogUrls: 被过滤的质量不合格的数据的查看地址
OtherMsg: 其他错误信息
- 查看导入作业状态
查看作业状态的具体命令和示例请参阅 SHOW ROUTINE LOAD 命令文档。
查看某个作业的任务运行状态的具体命令和示例请参阅 SHOW ROUTINE LOAD TASK 命令文档。
只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。
- 修改作业属性
用户可以修改已经创建的作业的部分属性。具体说明请参阅 ALTER ROUTINE LOAD 命令手册。
- 作业控制
用户可以通过
STOP/PAUSE/RESUME
三个命令来控制作业的停止,暂停和重启。具体命令请参阅 STOP ROUTINE LOAD,PAUSE ROUTINE LOAD,RESUME ROUTINE LOAD 命令文档。
通过外部表同步数据
Doris支持创建外部表,创建完成之后可以通过SELECT语句直接查询外部表的数据,也可以通过INSERT INTO SELECT 的方式导入外部表的数据
Doris 外部表目前支持的数据源包括:
- MySQL
- Oracle
- PostgreSQL
- SQLServer
- Hive
- Iceberg
- ElasticSearch
-
创建一个ODBC的外部表 创建 ODBC 外部表的详细介绍请参阅 CREATE EXTERNAL TABLE 语法帮助手册
CREATE EXTERNAL RESOURCE `oracle_test_odbc` PROPERTIES ( "type" = "odbc_catalog", "host" = "192.168.0.10", "port" = "8086", "user" = "oracle", "password" = "oracle", "database" = "oracle", "odbc_type" = "oracle", "driver" = "Oracle" );
-
创建外部表
CREATE EXTERNAL TABLE `ext_oracle_demo` ( `k1` decimal(9, 3) NOT NULL COMMENT "", `k2` char(10) NOT NULL COMMENT "", `k3` datetime NOT NULL COMMENT "", `k5` varchar(20) NOT NULL COMMENT "", `k6` double NOT NULL COMMENT "" ) ENGINE=ODBC COMMENT "ODBC" PROPERTIES ( "odbc_catalog_resource" = "oracle_test_odbc", "database" = "oracle", "table" = "baseall" );
-
创建Doris表
CREATE TABLE `doris_oralce_tbl` ( `k1` decimal(9, 3) NOT NULL COMMENT "", `k2` char(10) NOT NULL COMMENT "", `k3` datetime NOT NULL COMMENT "", `k5` varchar(20) NOT NULL COMMENT "", `k6` double NOT NULL COMMENT "" ) COMMENT "Doris Table" DISTRIBUTED BY HASH(k1) BUCKETS 2 PROPERTIES ( "replication_num" = "1" );
-
导入数据 此时 INSERT INTO SELECT是一个同步命令
INSERT INTO doris_oralce_tbl SELECT k1,k2,k3 FROM ext_oracle_demo limit 100;
- 必须保证外部数据源与 Doris 集群是可以互通,包括BE节点和外部数据源的网络是互通的。
- ODBC 外部表本质上是通过单一 ODBC 客户端访问数据源,因此并不合适一次性导入大量的数据,建议分批多次导入。
数据导出
异步导出(Export)是 Doris 提供的一种将数据异步导出的功能。该功能可以将用户指定的表或分区的数据,以指定的文件格式,通过 Broker 进程或 S3协议/HDFS协议 导出到远端存储上,如 对象存储 / HDFS
EXPORT支持导出 Doris本地表 / View视图 / 外表,支持导出到 parquet / orc / csv / csv_with_names / csv_with_names_and_types 文件格式
执行步骤
- 用户提交一个 Export 作业到 FE。
- FE会统计要导出的所有Tablets,然后根据
parallelism
参数将所有Tablets分组,每一组再根据maximum_number_of_export_partitions
参数生成若干个SELECT INTO OUTFILE
查询计划 - 根据
parallelism
参数,生成相同个数的ExportTaskExecutor
,每一个ExportTaskExecutor
由一个线程负责,线程由FE的Job 调度框架去调度执行。 - FE的Job调度器会去调度
ExportTaskExecutor
并执行,每一个ExportTaskExecutor
会串行地去执行由它负责的若干个SELECT INTO OUTFILE
查询计划
语法
EXPORT TABLE db1.tbl1
PARTITION (p1,p2)
[WHERE [expr]]
TO "hdfs://host/path/to/export/"
PROPERTIES
(
"label" = "mylabel",
"column_separator"=",",
"columns" = "col1,col2",
"parallelusm" = "3"
)
WITH BROKER "hdfs"
(
"username" = "user",
"password" = "passwd"
);
label
:本次导出作业的标识。后续可以使用这个标识查看作业状态。column_separator
:列分隔符。默认为\t
。支持不可见字符,比如 ‘\x07’。columns
:要导出的列,使用英文状态逗号隔开,如果不填这个参数默认是导出表的所有列。line_delimiter
:行分隔符。默认为\n
。支持不可见字符,比如 ‘\x07’。parallelusm
:并发3个线程去导出
查看导出状态
mysql> show EXPORT\G;
*************************** 1. row ***************************
JobId: 14008
State: FINISHED
Progress: 100%
TaskInfo: {"partitions":[],"max_file_size":"","delete_existing_files":"","columns":"","format":"csv","column_separator":"\t","line_delimiter":"\n","db":"default_cluster:demo","tbl":"student4","tablet_num":30}
Path: hdfs://host/path/to/export/
CreateTime: 2019-06-25 17:08:24
StartTime: 2019-06-25 17:08:28
FinishTime: 2019-06-25 17:08:34
Timeout: 3600
ErrorMsg: NULL
OutfileInfo: [
[
{
"fileNumber": "1",
"totalRows": "4",
"fileSize": "34bytes",
"url": "file:///127.0.0.1/Users/fangtiewei/tmp_data/export/f1ab7dcc31744152-bbb4cda2f5c88eac_"
}
]
]
1 row in set (0.01 sec)
JobId:作业的唯一 ID
State:作业状态:
- PENDING:作业待调度
- EXPORTING:数据导出中
- FINISHED:作业成功
- CANCELLED:作业失败
Progress:作业进度。该进度以查询计划为单位。假设一共 10 个线程,当前已完成 3 个,则进度为 30%。
TaskInfo:以 Json 格式展示的作业信息:
- db:数据库名
- tbl:表名
- partitions:指定导出的分区。
空
列表 表示所有分区。- column_separator:导出文件的列分隔符。
- line_delimiter:导出文件的行分隔符。
- tablet num:涉及的总 Tablet 数量。
- broker:使用的 broker 的名称。
- coord num:查询计划的个数。
- max_file_size:一个导出文件的最大大小。
- delete_existing_files:是否删除导出目录下已存在的文件及目录。
- columns:指定需要导出的列名,空值代表导出所有列。
- format:导出的文件格式
Path:远端存储上的导出路径。
CreateTime/StartTime/FinishTime:作业的创建时间、开始调度时间和结束时间。
Timeout:作业超时时间。单位是秒。该时间从 CreateTime 开始计算。
ErrorMsg:如果作业出现错误,这里会显示错误原因。
OutfileInfo:如果作业导出成功,这里会显示具体的
SELECT INTO OUTFILE
结果信息
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)