Sqoop简介

Sqoop是一款开源的工具,主要用于在Hadoop与传统的数据库间进行数据的传递,可以将一个关系型数据库中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。

常用参数
参数描述
–connect < jdbc-uri >JDBC连接地址
–username < username >设置连接用户名
–password < password >设置连接密码
–connection-manager < class-name >指定要使用的连接管理器类
–driver < class-name >手动指定要使用的 JDBC 驱动程序类
–hadoop-mapred-home < dir >覆盖 $HADOOP_MAPRED_HOME
–help打印使用说明
–password-file设置包含身份验证密码的文件的路径
-P从控制台读取密码
–verbose工作时打印更多信息
–connection-param-file < filename >提供连接参数的可选属性文件
–relaxed-isolation将连接事务隔离设置为读取未提交的映射器。

导入数据

sqoop-import

import可以将单个表从 RDBMS 导入到 HDFS。表中的每一行都表示为 HDFS 中的单独记录。记录可以存储为文本文件(每行一条记录),或以二进制表示形式存储为 Avro 或 SequenceFiles。

语法
$ sqoop import (generic-args) (import-args)
$ sqoop-import (generic-args) (import-args)
导入控制参数
参数描述
–append将数据追加到 HDFS 中的现有数据集
–as-avrodatafile将数据导入 Avro 数据文件
–as-sequencefile将数据导入 Sequence 数据文件
–as-textfile以纯文本形式导入数据(默认)
–as-parquetfile将数据导入 Parquet 文件
–boundary-query < statement >用于创建拆分的边界查询
–columns < col,col,col… >指定要从表中导入的列
–delete-target-dir删除导入目标目录(如果存在)
–direct如果数据库存在,则使用直连式连接器
–fetch-size < n >指定一次从数据库中读取的条目数。
–inline-lob-limit < n >设置内联 LOB 的最大大小
-m, --num-mappers < n >使用n个map任务并行导入,控制导入并行度
-e, --query < statement >导入statement的结果
–split-by < column-name >用于拆分工作单元的表格列。不能与 --autoreset-to-one-mapper选项一起使用。
–autoreset-to-one-mapper如果表没有主键并且没有提供拆分列,则导入时应该使用一个映射器。不能与 --split-by < col >选项一起使用。
–table < table-name >指定要导入的表
–target-dir < dir >数据导入到hdfs中的目标目录
–warehouse-dir < dir >数据导入到hdfs中的目标目录的父级
–where < where clause >导入期间使用的 WHERE 子句
-z,–compress启用压缩
–compression-codec < c >使用 Hadoop 编解码器(默认 gzip)
–null-string < null-string >为字符串列的空值写入的字符串
–null-non-string < null-string >为非字符串列的空值写入的字符串
–enclosed-by < char >设置必填字段封闭字符
–escaped-by < char >设置转义字符
–fields-terminated-by < char >设置字段分隔符
–lines-terminated-by < char >设置行尾字符
–mysql-delimiters使用 MySQL 的默认分隔符集: fields: , lines: \n escaped-by: \ optional-enclosed-by:’
–optionally-enclosed-by < char >设置字段封闭字符
–check-column (col)指定在确定要导入哪些行时要检查的列。(该列的类型不应为 CHAR/NCHAR/VARCHAR/VARNCHAR/LONGVARCHAR/LONGNVARCHAR)
–incremental (mode)指定 Sqoop 如何确定哪些行是新的。mode includeappend和的合法值lastmodified。
–last-value (value)指定上次导入的检查列的最大值。
数据导入

从关系型数据库到HDFS

// sqoop可执行文件的目录
$ cd /opt/usdp-srv/srv/udp/2.0.0.0/sqoop/bin/

// 整表导入
$ sqoop import \
 --connect jdbc:mysql://192.168.12.211:3306/xl_vip \
 --username root \
 --password root \
 --table goods_order \
 --target-dir /user/test/sqoop_goods_order \
 --delete-target-dir \
 --num-mappers 1
 --fields-terminated-by "\t"

// 根据sql查询结果集导入
$ sqoop import \
 --connect jdbc:mysql://192.168.12.211:3306/xl_vip \
 --username root \
 --password root \
 --query 'select * from goods_order where market_id = 11 and $CONDITIONS;' \
 --target-dir /user/test/sqoop_goods_order \
 --delete-target-dir \
 -m 1

从关系型数据库到Hive

参数描述
–hive-home < dir >覆盖$HIVE_HOME
–hive-import将表导入 Hive(如果未设置,则使用 Hive 的默认分隔符。)
–hive-overwrite覆盖 Hive 表中的现有数据。
–create-hive-table创建hive表,若目标表存在,将会创建失败,默认为false
–hive-table < table-name >设置导入 Hive 时要使用的表名。
–hive-drop-import-delims导入 Hive 时从字符串字段中 删除\n、\r和\01 。
–hive-delims-replacement导入 Hive 时,将字符串字段中的 \n、\r和\01 替换为用户定义的字符串。
–hive-partition-key要分区的配置单元字段的名称被分片
–hive-partition-value < v >用作此作业中导入 hive 的分区键的字符串值。
–map-column-hive < map >为配置的列覆盖从 SQL 类型到 Hive 类型的默认映射

如果您的数据库的行包含具有 Hive 的默认行分隔符(\n和\r字符)或列分隔符(\01字符)的字符串字段,则 Hive 使用 Sqoop 导入的数据会出现问题。您可以使用–hive-drop-import-delims选项在导入时删除这些字符以提供与 Hive 兼容的文本数据。或者,您可以使用–hive-delims-replacement选项在导入时将这些字符替换为用户定义的字符串,以提供与 Hive 兼容的文本数据。仅当您使用 Hive 的默认分隔符时才应使用这些选项,如果指定了不同的分隔符,则不应使用这些选项。

Sqoop 会将字段和记录分隔符传递给 Hive。如果不设置任何分隔符并使用–hive-import,则字段分隔符将设置为^A,记录分隔符将设置为\n与 Hive 的默认值一致。

Sqoop 默认将 NULL 值作为字符串导入null。然而,Hive 使用字符串\N来表示NULL值,因此处理NULL(like IS NULL) 的谓词将无法正常工作。如果您希望正确保留值,则应在导入作业或 导出作业–null-string的情况下附加参数。因为 sqoop 在生成的代码中使用这些参数,所以您需要将值正确转义为:–null-non-string–input-null-string–input-null-non-stringNULL\N\N

$ sqoop import \
 --connect jdbc:mysql://192.168.12.211:3306/xl_vip \
 --username root \
 --password root \
 --table goods_order \
 --warehouse-dir /hadoop-cluster/user/hive/warehouse/sqoop_goods_order \
 --hive-table sqoop_goods_order \
 --hive-import \
 --null-string '\\N' \
 --null-non-string '\\N' \
 --create-hive-table \
 --fields-terminated-by "\t"

// 根据sql查询结果集导入
$ sqoop import \
 --connect jdbc:mysql://192.168.12.211:3306/xl_vip \
 --username root \
 --password root \
 --query 'select * from goods_order where market_id = 11 and $CONDITIONS;' \
 --target-dir /hadoop-cluster/user/hive/warehouse/sqoop_goods_order \
 --delete-target-dir \
 --hive-table sqoop_goods_order \
 --hive-import \
 --null-string '\\N' \
 --null-non-string '\\N' \
 --create-hive-table \
 --hive-partition-key 'dt'\
 --hive-partition-value '20220922'\
 -m 1
增量导入

Sqoop 提供了一种增量导入模式,可用于仅检索比某些先前导入的行集更新的行。

Sqoop 支持两种类型的增量导入:append 和 lastmodified。您可以使用 --incremental 参数指定要执行的增量导入的类型。

您应该在导入表时指定附加模式,其中新行不断添加且行 ID 值增加。您可以使用 --check-column 指定包含行 ID 的列。 Sqoop 导入检查列的值大于 --last-value 指定的值的行。

Sqoop 支持的另一种表更新策略称为 lastmodified 模式。当源表的行可能被更新时,您应该使用它,并且每次更新都会将 last-modified 列的值设置为当前时间戳。导入检查列包含的时间戳对比使用 --last-value 指定更新时间戳的行。

在增量导入结束时,应为后续导入指定 --last-value 的值将会打印到屏幕上。运行后续导入时,您应该以这种方式指定 --last-value 以确保仅导入新的或更新的数据。

sqoop-import-all-tables

该import-all-tables工具是将一组表从 RDBMS 导入到 HDFS。每个表中的数据都存储在 HDFS 的单独目录中。
要使该import-all-tables工具生效,必须满足以下条件:

  • 每个表必须有一个单列主键或使用–autoreset-to-one-mapper参数。
  • 您必须导入每张表的所有列。
  • 您不用使用非默认拆分列,也不能通过WHERE子句强加任何条件。
语法
$ sqoop import-all-tables (generic-args) (import-args)
$ sqoop-import-all-tables (generic-args) (import-args)
导入参数
参数描述
–as-avrodatafile将数据导入 Avro 数据文件
–as-sequencefile将数据导入 SequenceFiles
–as-textfile以纯文本形式导入数据(默认)
–as-parquetfile将数据导入 Parquet 文件
–direct使用直接导入快速路径
–inline-lob-limit < n >设置内联 LOB 的最大大小
-m,–num-mappers < n >使用n个map任务并行导入
–warehouse-dir < dir >表目标的 HDFS 父级
-z,–compress启用压缩
–compression-codec < c >使用 Hadoop 编解码器(默认 gzip)
–exclude-tables < tables >要从导入过程中排除的表的逗号分隔列表
–autoreset-to-one-mapper如果遇到没有主键的表,导入应该使用一个映射器
输出行参数
参数描述
–enclosed-by < char >设置必填字段封闭字符
–escaped-by < char >设置转义字符
–fields-terminated-by < char >设置字段分隔符
–lines-terminated-by < char >设置行尾字符
–mysql-delimiters使用 MySQL 的默认分隔符集: fields: , lines: \n escaped-by: \ optional-enclosed-by:’
–optionally-enclosed-by < char >设置字段封闭字符
示例

从xl_vip数据库中导入所有表:

$ sqoop import-all-tables --connect jdbc:mysql://192.168.12.211:3306/xl_vip

导出数据

sqoop-export

该export工具是将一组文件从 HDFS 导出回 RDBMS。目标表必须已经存在于数据库中。根据用户指定的分隔符读取输入文件并解析为一组记录。
默认操作是将这些转换为一组INSERT语句。在“update mode”下,Sqoop 将生成UPDATE替换数据库中现有记录的语句,而在“call mode”下,Sqoop 将对每条记录进行存储过程调用。

语法
$ sqoop export (generic-args) (export-args)
$ sqoop-export (generic-args) (export-args)
导出控制参数
参数描述
–columns < col,col,col… >要导出到表的列
–direct使用直接导出快速路径
–export-dir < dir >导出的 HDFS 源路径
-m,–num-mappers < n >使用n个map任务并行导出
–table < table-name >要填充的表
–call < stored-proc-name >要调用的存储过程
–update-key < col-name >用于更新的锚列。如果有多个列,请使用逗号分隔的列列表。
–update-mode < mode >指定当在数据库中找到具有不匹配键的新行时如何执行更新。mode 的合法值包括 updateonly(默认)和 allowinsert
–input-null-string < null-string >字符串列被解释为 null 的字符串
–input-null-non-string < null-string >对于非字符串列,要解释为 null 的字符串
–staging-table < staging-table-name >在将数据插入目标表之前将在其中暂存数据的表。
–clear-staging-table表示可以删除临时表中存在的任何数据。
–batch使用批处理模式执行基础语句。
插入与更新

默认情况下,sqoop-export将新行追加到表中;每个输入记录都被转换为一个INSERT语句,该语句将一行添加到目标数据库表中。如果您的表具有约束(例如,其值必须唯一的主键列)并且已经包含数据,则必须注意避免插入违反这些约束的记录。INSERT语句如果失败,导出过程将失败。此模式主要用于将记录导出到旨在接收这些结果的新空表。

如果您指定–update-key参数,Sqoop 将改为修改数据库中的现有数据集。每个输入记录都被视为UPDATE修改现有行的语句。语句修改的行由用–update-key指定的列名确定。

例如: 从/user/test/sqoop_goods_order导出到goods_order表,并根据id更新记录

sqoop export \
 --connect jdbc:mysql://192.168.12.211:3306/xl_vip \
 --username root \
 --password root \
 --table goods_order \
 --update-key id \
 --export-dir /user/test/sqoop_goods_order \
 --fields-terminated-by "\t"

作业

sqoop-job

job工具允许您创建和使用已保存的作业。保存的作业会记住用于指定作业的参数,因此可以通过句柄调用作业来重新执行它们。

如果已保存作业配置为执行增量导入,则有关最近导入行的状态会在已保存作业中更新,以允许该作业仅持续导入最新行。

语法
$ sqoop job (generic-args) (job-args) [-- [subtool-name] (subtool-args)]
$ sqoop-job (generic-args) (job-args) [-- [subtool-name] (subtool-args)]
作业管理选项
参数描述
–create < job-id >创建作业; 使用指定的作业 ID(名称)定义一个新的作业。应指定第二个 Sqoop 命令行,用 – 分隔。
–delete < job-id >删除已保存的作业。
–exec < job-id >给定一个用 --create定义的作业,运行已保存的作业。
–show < job-id >显示已保存作业的参数。
–list列出所有已保存的作业
创建作业

创建作业是通过 --create 操作完成的。此操作需要一个作业名称后跟 – 及作业内容。参考:

$ sqoop job --create myjob \
 -- import \
 --connect jdbc:mysql://192.168.12.211:3306/xl_vip \
 --username root \
 --password root \
 --table goods_order \
 --warehouse-dir /hadoop-cluster/user/hive/warehouse/sqoop_goods_order \
 --hive-table sqoop_goods_order \
 --hive-import \
 --null-string '\\N' \
 --null-non-string '\\N' \
 --fields-terminated-by "\t"

这将创建一个名为 myjob 的作业,该作业可以稍后执行。该作业现在已保存在作业列表中,可以通过–list进行查看:

$ sqoop job --list
Available jobs:
  myjob

我们也可以使用 --show 操作检查作业的配置:

$ sqoop job --show myjob
Job: myjob
Tool: import
Options:
----------------------------
verbose = false
hcatalog.drop.and.create.table = false
codegen.output.delimiters.escape = 0
codegen.output.delimiters.enclose.required = false
codegen.input.delimiters.field = 0
split.limit = null
...

如果配置无误,我们可以使用 --exec 运行该作业:

$ sqoop job --exec myjob

–exec 操作允许您通过在 – 之后提供参数来覆盖已保存作业的参数。例如,如果数据库更改为需要用户名,我们可以指定用户名和密码:

$ sqoop job --exec myjob -- --username test -P
Enter password:

HCatalog表

向 Sqoop 公开 HCatalog 表

新的命令行选项
参数描述
–hcatalog-database指定 HCatalog 表的数据库名称。如果未指定,则默认使用 default数据库名称。提供不带 --hcatalog-table 的 --hcatalog-database 选项是错误的。这不是必需的选项。
–hcatalog-table此选项的参数值是 HCatalog 表名。该–hcatalog-table选项的存在表示导入或导出作业是使用 HCatalog 表完成的,它是 HCatalog 作业的必需选项。
–hcatalog-homeHCatalog 安装的主目录。该目录应该有一个 lib 子目录和一个 share/hcatalog 子目录,其中包含必要的 HCatalog 库。如果未指定,将检查系统属性 hcatalog.home,否则将检查系统环境变量 HCAT_HOME。如果这些都没有设置,将使用默认值,当前默认设置为 /usr/lib/hcatalog。这不是必需的选项。
–create-hcatalog-table此选项指定在导入数据时是否应自动创建 HCatalog 表。默认情况下,假设 HCatalog 表存在。表名将与转换为小写的数据库表名相同。
–hcatalog-storage-stanza此选项指定要附加到表的存储节。
–hcatalog-partition-keys和–hcatalog-partition-values这两个选项用于指定多个静态分区键/值对。在之前的版本中, --hive-partition-key 和 --hive-partition-value 选项用于指定静态分区键/值对,但只能提供一级静态分区键。选项 --hcatalog-partition-keys 和 --hcatalog-partition-values 允许将多个键和值作为静态分区键提供。多个选项值用 ,(逗号)分隔。
例如,如果要从中导出/导入的表的 hive 分区键是使用分区键名称年、月和日期定义的,并且具有年 = 1999、月 = 12、日 = 31 的特定分区是所需的分区,则这两个选项的值如下:
	--hcatalog-partition-keys year,month,day
	--hcatalog-partition-values 1999,12,31

为了提供向后兼容性,如果未提供 --hcatalog-partition-keys 或 --hcatalog-partition-values 选项,则将使用 --hive-partitition-key 和 --hive-partition-value(如果提供)。 仅指定 --hcatalog-partition-keys 或 --hcatalog-partition-values 选项之一是错误的。应该提供两个选项,或者两个选项都不提供。

支持的 Sqoop Hive 选项

以下 Sqoop 选项还与 --hcatalog-table 选项一起使用,为 HCatalog 作业提供额外的输入。一些现有的 Hive 导入作业选项与 HCatalog 作业一起重用,而不是为相同目的创建特定于 HCatalog 的选项。

参数描述
–map-column-hive此选项将数据库列映射到具有特定 HCatalog 类型的 HCatalog。
–hive-homeHive 主页位置。
–hive-partition-key用于静态分区过滤器。分区键的类型应为 STRING。只能有一个静态分区键。请参阅有关 --hcatalog-partition-keys 和 --hcatalog-partition-values 选项的讨论。
–hive-partition-value与分区关联的值。请参阅有关 --hcatalog-partition-keys 和 --hcatalog-partition-values 选项的讨论。
不支持的 Sqoop 选项
// 不支持的 Sqoop Hive 导入选项
1. --hive-import
2. --hive-overwrite (忽略选项)

// 不支持的 Sqoop 导出和导入选项
1. --export-dir
2. --target-dir
3. --warehouse-dir
4. --append
5. --as-sequencefile
6. --as-avrodatafile
7. --as-parquetfile

// 忽略的 Sqoop 选项
1. 所有输入分隔符选项都将被忽略。
2. 除非使用 --hive-drop-import-delims 或 --hive-delims-replacement,否则通常会忽略输出分隔符。当指定 --hive-drop-import-delims 或 --hive-delims-replacement 选项时,所有 CHAR 类型的数据库表列将被后处理以分别删除或替换分隔符。仅当 HCatalog 表使用文本格式时才需要这样做。

自动创建表

Sqoop 的主要功能之一是在导入 Hadoop 时管理和创建表元数据。 HCatalog 导入作业还通过选项 --create-hcatalog-table 提供了此功能。此外,HCatalog 集成的重要优势之一是为 Sqoop 数据移动作业提供不可知的存储。为了提供该功能,HCatalog 导入作业提供了一个选项,允许用户为创建的表指定存储格式。

选项 --create-hcatalog-table 用作指示必须将表创建为 HCatalog 导入作业的一部分。如果指定了选项 --create-hcatalog-table 并且表存在,则表创建将失败并且作业将中止。

选项 --hcatalog-storage-stanza 可用于指定新创建表的存储格式。此选项的默认值存储为 rcfile。假定为此选项指定的值是有效的 Hive 存储格式表达式。作为自动表创建的一部分,它将附加到 HCatalog 导入作业生成的 create table 命令中。存储节中的任何错误都将导致表创建失败并且导入作业将被中止。

支持选项 --hcatalog-storage-stanza 中引用的存储格式所需的任何其他资源都应通过将它们放在 $HIVE_HOME/lib 中或通过在 HADOOP_CLASSPATH 和 LIBJAR 文件中来提供给作业。

如果指定了 --hive-partition-key 选项,则该选项的值用作新创建表的分区键。使用此选项只能指定一个分区键。

映射到 HCatalog 表时,对象名称将映射到下面指定的小写等效项。这包括表名(与转换为小写的外部存储表名相同)和字段名。

分隔文本格式以及字段和行分隔符

HCatalog 支持分隔文本格式作为表格存储格式之一。但是当使用分隔文本并且导入的数据有包含这些分隔符的字段时,Hive 可能会将数据解析为不同数量的字段和记录,从而失去数据保真度。

对于这种情况,可以使用以下现有 Sqoop 导入选项之一:

--hive-delims-replacement
--hive-drop-import-delims

如果为导入提供了这些选项中的任何一个,则任何类型为 STRING 的列都将使用 Hive 分隔符处理进行格式化,然后写入 HCatalog 表。

HCatalog 表要求

如果默认表创建选项(带有可选存储节)不够用,则应在将其用作 Sqoop 作业的一部分之前创建 HCatalog 表。 HCatalog 支持的所有存储格式都可以用于创建 HCatalog 表。这使得该功能很容易采用 Hive 项目中的新存储格式,例如 ORC 文件。

支持分区

Sqoop HCatalog 功能支持以下表类型:

未分区表
指定了静态分区键的分区表
具有来自数据库结果集中的动态分区键的分区表
具有静态键和附加动态分区键组合的分区表

模式映射

Sqoop 目前不支持列名映射。但是,允许用户覆盖类型映射。类型映射遵循 Sqoop 中已经存在的 Hive 类型映射,除了 SQL 类型 FLOAT 和 REAL 映射到 HCatalog 类型 float。在 Hive 的 Sqoop 类型映射中,这两个映射为 double。类型映射主要用于检查列定义的正确性,并且可以使用 --map-column-hive 选项覆盖。

除了二进制之外的所有类型都可以分配给字符串类型。

在导出和导入期间,任何数字类型的字段(int、shortint、tinyint、bigint 和 bigdecimal、float 和 double)都可以分配给任何数字类型的另一个字段。根据分配的目标类型的精度和规模,可能会发生截断。

此外,日期/时间/时间戳映射到日期/时间戳配置单元类型。(完整的日期/时间/时间戳表示)。Date/time/timstamp 列也可以映射到 bigint Hive 类型,在这种情况下,该值将是自纪元以来的毫秒数。

BLOB 和 CLOB 仅支持导入。导入时的 BLOB/CLOB 对象以特定于 Sqoop 的格式存储,在 Pig/Hive 作业或另一个 MapReduce 作业中处理这些对象时需要了解这种格式。

当映射到 HCatalog 字段时,数据库列名将映射到它们的小写等效项。目前,不支持区分大小写的数据库对象名称。

允许将一组列从表投影到 HCatalog 表或加载到列投影,但受表约束。将数据导入 HCatalog 表时,动态分区列(如果有)必须是投影的一部分。

动态分区字段应映射到使用 NOT NULL 属性定义的数据库列(尽管在模式映射期间不强制执行此操作)。动态分区列的导入过程中的空值将中止 Sqoop 作业。

支持 HCatalog 数据类型

支持 Hive 0.13 版本中的所有原始 Hive 类型。目前不支持所有复杂的 HCatalog 类型。

BLOB/CLOB 数据库类型仅支持导入。

例子

// 创建一个 HCatalog 表
hcat -e "create table txn(txn_date string, cust_id string, amount float, store_id int) partitioned by (cust_id string) stored as rcfile;"

// import
$sqoop import --connect <jdbc-url> -table <table-name> --hcatalog-table txn <other sqoop options>

// export
$sqoop export --connect <jdbc-url> -table <table-name> --hcatalog-table txn <other sqoop options>

使用Dolphinscheduler调度Sqoop

Dolphinscheduler地址

  1. 选择项目管理模块,在工作流定义中创建一个工作流
  2. 从左侧工具栏中选择sqoop工具
  3. 输入节点名称等参数,
  • 选择自定义任务 输入自定义脚本执行任务
  • 使用配置的方式执行任务,需配置输入、输出、mysql连接信息、hdfs或hive的信息
  1. 配置完成后,保存并在工作流定义页面上线工作流
  2. 点击运行,即可执行任务;可在任务实例中查看任务执行情况和执行日志,在工作流实例中查看工作流的执行情况。

参考文档

https://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐