这是CDP中Apache Hive3用户指南系列之一,之前的文章请参考<CDP的Hive3系列之Hive Metastore介绍>,<CDP中的Hive3系列之Apache Hive3的特性>,<CDP中的Hive3系列之启动Apache Hive3>,<CDP中的Hive3系列之Hive3使用指南>,<CDP中的Hive3系列之管理Hive3>,<CDP中的Hive3系列之管理Hive的工作负载>,<CDP中的Hive3系列之配置Apache Hive3>,<CDP中的Hive3系列之保护Hive3>和<CDP中的Hive3系列之Hive性能调优>.
1 数据迁移到 Apache Hive
要将数据从 RDBMS(例如 MySQL)迁移到 Hive,您应该考虑使用带有 Teradata 连接器的 CDP 中的 Apache Sqoop。Apache Sqoop 客户端基于 CLI 的工具在关系数据库和 HDFS 或云对象存储(包括 Amazon S3 和 Microsoft ADLS)之间批量传输数据。
需要进行提取、转换和加载 (ETL) 过程的遗留系统数据源通常驻留在文件系统或对象存储中。您还可以以分隔文本(默认)或 SequenceFile 格式导入数据,然后将数据转换为 Hive 推荐的 ORC 格式。通常,为了在 Hive 中查询数据,ORC 是首选格式,因为 ORC 提供了性能增强。
1.1 将 Sqoop 与 Teradata 连接器结合使用
CDP 不支持使用 Hadoopjar命令(Java API)的 Sqoop 导出。Teradata 的连接器文档包括使用此 API 的说明。据报道,CDP 用户将不受支持的 API 命令(例如 )误认为是 -forcestage受支持的 Sqoop 命令,例如 –-staging-force。Cloudera 仅支持通过使用由 Teradata 提供支持的 Cloudera 连接器中记录的命令使用 Sqoop 。Cloudera 不支持将 Sqoop 与 Hadoop jar命令一起使用,例如 Teradata Connector for Hadoop 教程中所述的命令。
1.2 Cloudera 网站上的 Apache Sqoop 文档
要访问最新的 Sqoop 文档,请转到Sqoop 文档 1.4.7.7.1.6.0。
2 设置 Sqoop
Cloudera Runtime 包括 Sqoop 客户端,用于将数据从不同数据源批量导入和导出到 Hive。您将学习如何在 CDP 中安装 RDBMS 连接器和 Sqoop 客户端。
1) 在 Cloudera Manager 的 Clusters 中,从选项菜单中选择 Add Service。
2) 选择 Sqoop 客户端并单击继续。
3) 分别根据 Sqoop 导入或导出的源或目标的数据源选择 JDBC 数据库驱动程序。
4) /var/lib/sqoop在 Sqoop 节点上安装 JDBC 数据库驱动程序。
不要安装,/opt/cloudera/parcels/CDH因为升级修改了这个目录。
· MySQL:将 MySQL 驱动下载 https://dev.mysql.com/downloads/connector/j/ 到 /var/lib/sqoop,然后运行tar -xvzf mysql-connector-java-<version>.tar.gz.
· Oracle:从 下载驱动程序 https://www.oracle.com/technetwork/database/enterprise-edition/jdbc-112010-090769.html 并将其放入/var/lib/sqoop.
· PostgreSQL:从下载驱动程序 http://jdbc.postgresql.org/download.html并将其放入 /var/lib/sqoop.
5) 在 Cloudera Manager 中,单击 操作>部署客户端配置。
设置 Sqoop 客户端后,您可以使用以下连接字符串输入Sqoop 命令,具体取决于您的数据源。
· MySQL 语法:
jdbc:mysql://<HOST>:<PORT>/<DATABASE_NAME>
例子:
jdbc:mysql://my_mysql_server_hostname:3306/my_database_name
· Oracle语法:
jdbc:oracle:thin:@<HOST>:<PORT>:<DATABASE_NAME>
例子:
jdbc:oracle:thin:@my_oracle_server_hostname:1521:my_database_name
· PostgreSQL 语法:
jdbc:postgresql://<HOST>:<PORT>/<DATABASE_NAME>
例子:
jdbc:postgresql://my_postgres_server_hostname:5432/my_database_name
· Netezza 语法:
jdbc:netezza://<HOST>:<PORT>/<DATABASE_NAME>
例子:
jdbc:netezza://my_netezza_server_hostname:5480/my_database_name
· Teradata 语法:
jdbc:teradata://<HOST>/DBS_PORT=1025/DATABASE=<DATABASE_NAME>
例子:
jdbc:teradata://my_teradata_server_hostname/DBS_PORT=1025/DATABASE=my_database_name
3 将数据从数据库移动到 Apache Hive
您可以使用 Sqoop 将数据从关系数据库导入到 Hive 中使用。您可以将数据直接导入 Hive。假设 Sqoop 客户端服务在您的集群中可用,您可以从命令行发出导入和导出命令。
3.1 创建 Sqoop 导入命令
您可以创建一个 Sqoop 导入命令,该命令使用 Apache Sqoop 将来自不同数据源(例如不同网络上的关系数据库)的数据导入 Apache Hive。
您在 Hive 集群的命令行中输入 Sqoop 导入命令,将数据从数据源导入集群文件系统和 Hive。导入可以包括以下信息,例如:
· 数据库连接信息:数据库URI、数据库名称、连接协议,如 jdbc:mysql:
· 要导入的数据
· 用于快速数据传输的并行处理指令
· 导入数据的目的地
Sqoop 经测试可与 Connector/J 5.1 一起使用。如果您已升级到 Connector/J 8.0,并且想要使用该zeroDateTimeBehavior属性来处理 DATE 列中“0000-00-00\”的值,请zeroDateTimeBehavior=CONVERT_TO_NULL在连接字符串中明确指定 。例如,--connect jdbc:mysql://<MySQL host>/<DB>?zeroDateTimeBehavior=CONVERT_TO_NULL。
1) 创建一个导入命令,指定到 RDBMS 的 Sqoop 连接。
n 要在命令行上输入数据源的密码,请使用-P连接字符串中的 选项。
n 要指定存储密码的文件,请使用该 --password-file选项。
n 命令行密码:
sqoop import --connect jdbc:mysql://db.foo.com:3306/bar \ <data to import> \ --username <username> \ -P
指定密码文件:
sqoop import --connect jdbc:mysql://db.foo.com:3306/bar \ --table EMPLOYEES \ --username <username> \ --password-file ${user.home}/.password
2) 在命令中指定要导入的数据。
n 导入整个表。
n 导入列的子集。
n 使用自由格式查询导入数据。
整个表:
sqoop import \ --connect jdbc:mysql://db.foo.com:3306/bar \ --table EMPLOYEES
列子集:
sqoop import --connect jdbc:mysql://db.foo.com:3306/bar \ --table EMPLOYEES \ --columns "employee_id,first_name,last_name,job_title"
导入最新数据的自由格式查询:
sqoop import \ --connect jdbc:mysql://db.foo.com:3306/bar \ --table EMPLOYEES \ --where "start_date > '2018-01-01'"
3) 或者,在 import 语句中指定write parallelism以并行执行多个 map 任务:
n 设置映射器:如果源表有主键,则使用--num-mappers.
n 拆分方式:如果主键分布不均,请使用以下方法提供拆分键 --split-by
n 顺序:如果您没有主键或拆分键,请使用--num-mappers 1或 --autoreset-to-one-mapper在查询中顺序导入数据。
n 设置映射器:
sqoop import --connect jdbc:mysql://db.foo.com:3306/bar \ --table EMPLOYEES \ --num-mappers 8 \
n Split:
sqoop import --connect jdbc:mysql://db.foo.com:3306/bar \ --table EMPLOYEES \ --split-by dept_id
n 设置映射器均匀地分割源表的主键范围。
n Split by 使用拆分键而不是主键均匀拆分数据。
4) 指定使用 Hive 默认分隔符将数据导入 Hive --hive-import。
5) 指定数据的 Hive 目标。
n 如果您认为 Hive 中不存在该表,请命名数据库和表,并使用 --create-hive-table 选项。
n 如果要将导入的数据插入到现有的 Hive 外部表中,请命名数据库和表,但不要使用 --create-hive-table 选项。
此命令将 MySQL EMPLOYEES 表导入到仓库中命名的新 Hive 表中。
sqoop import --connect jdbc:mysql://db.foo.com:3306/corp \ --table EMPLOYEES \ --hive-import \ --create-hive-table \ --hive-database 'mydb' \ --hive-table 'newtable'
此命令将 MySQL EMPLOYEES 表导入到 HDFS 中的外部表。
/
sqoop import --connect jdbc:mysql://db.foo.com:3306/corp \ --table EMPLOYEES \ --hive-import \ --hive-database 'mydb' \ --hive-table 'myexternaltable'
如上所示,在单独的行(推荐)中指定用单引号括起来的数据库和表名称。或者在一行中指定数据库和表名,并用反引号将数据库和表名括起来。
--hive-table `mydb`.`myexternaltable`
由于 Hive-16907 错误修复,Hive 拒绝 SQL 查询中的 `db.table`。表名中不再允许使用点 (.)。您需要更改使用此类引用的查询,以防止 Hive 将整个 db.table 字符串解释为表名。
3.2 将 RDBMS 数据导入 Hive
您可以测试Apache Sqoop 导入命令,然后执行该命令将关系数据库表导入Apache Hive。
您可以在 Hive 集群的命令行中输入 Sqoop import 命令,以将数据从数据源导入 Hive。您可以在实际执行之前测试导入语句。
· Apache Sqoop 客户端服务可用。
· Hive Metastore 和 Hive 服务可用。
1) 或者,在执行前使用 eval 选项测试导入命令。
sqoop eval --connect jdbc:mysql://db.foo.com/bar \ --query "SELECT * FROM employees LIMIT 10"
select 语句的输出出现,其中列出了 RDBMS 员工表中的 10 行数据。
2) 执行 Sqoop 导入命令,指定到 RDBMS 的 Sqoop 连接、要导入的数据以及目标 Hive 表名称。
此命令将 MySQL EMPLOYEES 表导入到仓库中命名的新 Hive 表中。
sqoop import --connect jdbc:mysql://db.foo.com:3306/corp \ --table EMPLOYEES \ --hive-import \ --create-hive-table \ --hive-table mydb.newtable
4 将数据从 HDFS 移动到 Apache Hive
在CDP Private Cloud Base 中,您可以将不同数据源的数据导入 HDFS,进行 ETL 处理,然后在 Apache Hive 中查询数据。
4.1 将 RDBMS 数据导入 HDFS
在CDP Private Cloud Base 中,您创建一个 Sqoop 导入命令,将数据从关系数据库导入 HDFS。
您在集群的命令行中输入 Sqoop import 命令以将数据导入 HDFS。导入命令需要包含数据库URI、数据库名称和连接协议,如jdbc:mysql:m和要导入的数据。可选地,该命令可以包括用于快速数据传输的并行处理指令、导入数据的 HDFS 目标目录、数据分隔符和其他信息。如果未指定其他位置,则使用默认 HDFS 目录。字段以逗号分隔,行以行分隔。您可以在实际执行之前测试导入语句。
· Apache Sqoop 已安装和配置。
1) 创建一个导入命令,指定与您要导入的数据源的 Sqoop 连接。
n 如果要在命令行上为数据源输入密码,请使用-P连接字符串中的选项。
n 如果要指定存储密码的文件,请使用该 --password-file选项。
n 命令行密码:
sqoop import --connect jdbc:mysql://db.foo.com/bar \ <data to import> \ --username <username> \ -P
n 指定密码文件:
sqoop import --connect jdbc:mysql://db.foo.com/bar \ --table EMPLOYEES \ --username <username> \ --password-file ${user.home}/.password
2) 在命令中指定要导入的数据。
n 导入整个表。
n 导入列的子集。
n 使用自由格式查询导入数据。
整个表:
sqoop import \ --connect jdbc:mysql://db.foo.com/bar \ --table EMPLOYEES
列子集:
sqoop import \ --connect jdbc:mysql://db.foo.com/bar \ --table EMPLOYEES \ --columns "employee_id,first_name,last_name,job_title"
导入最新数据的自由格式查询:
sqoop import \ --connect jdbc:mysql://db.foo.com/bar \ --table EMPLOYEES \ --where "start_date > '2018-01-01'"
3) 使用--target-dir选项指定导入数据的目的地 。
此命令使用默认文本文件分隔符将从 MySQL EMPLOYEES 表导入的数据附加到 HDFS 目标目录中的输出文件。
sqoop import \ --connect jdbc:mysql://db.foo.com:3600/bar \ --table EMPLOYEES \ --where "id > 100000" \ --target-dir /incremental_dataset \ --append
该命令按列拆分导入的数据,并指定将数据导入到 HDFS 目标目录中的输出文件中。
sqoop import \ --connect jdbc:mysql://db.foo.com:3600/bar \ --query 'SELECT a.*, b.* \ FROM a JOIN b on (a.id == b.id) \ WHERE $CONDITIONS' \ --split-by a.id \ --target-dir /user/foo/joinresults
此命令执行一次并使用 -m 1 选项指定的单个映射任务串行导入数据:
sqoop import \ --connect jdbc:mysql://db.foo.com:3600/bar \ --query \ 'SELECT a.*, b.* \ FROM a \ JOIN b on (a.id == b.id) \ WHERE $CONDITIONS' \ -m 1 \ --target-dir /user/foo/joinresults
4) 或者,在 import 语句中指定write parallelism以并行执行多个 map 任务:
n 设置映射器:如果源表有主键,则使用--num-mappers.
n 拆分方式:如果主键分布不均,请使用以下方法提供拆分键 --split-by
n 顺序:如果您没有主键或拆分键,请使用--num-mappers 1或 --autoreset-to-one-mapper在查询中顺序导入数据。
n 设置映射器:
sqoop import --connect jdbc:mysql://db.foo.com:3306/bar \ --table EMPLOYEES \ --num-mappers 8
n 拆分:
sqoop import --connect jdbc:mysql://db.foo.com:3306/bar \ --table EMPLOYEES \ --split-by dept_id
n 设置映射器均匀地分割源表的主键范围。
n Split by 使用拆分键而不是主键均匀拆分数据。
5) 或者,在执行前使用 eval 选项测试导入命令。
sqoop eval --connect jdbc:mysql://db.foo.com:3306/bar \ --query "SELECT * FROM employees LIMIT 10"
将出现 select 语句的输出。
4.2 将 HDFS 文件转换为 ORC
在CDP Private Cloud Base 中,要使用 Hive 查询 HDFS 中的数据,您需要对数据应用架构,然后以 ORC 格式存储数据。
要将HDFS中存储的数据转换成Hive中推荐的查询格式,您可以通过创建Hive外部表的方式为HDFS数据创建一个schema,然后创建一个Hive托管的表来转换和查询ORC格式的数据。转换是一个并行的分布式动作,不需要独立的 ORC 转换工具。假设您有以下 CSV 文件,其中包含描述字段的标题行和包含以下数据的后续行:
Name,Miles_per_Gallon,Cylinders,Displacement,Horsepower,Weight_in_lbs, \ Acceleration,Year,Origin "chevrolet chevelle malibu",18,8,307,130,3504,12,1970-01-01,A "buick skylark 320",15,8,350,165,3693,11.5,1970-01-01,A "plymouth satellite",18,8,318,150,3436,11,1970-01-01,A "amc rebel sst",16,8,304,150,3433,12,1970-01-01,A "ford torino",17,8,302,140,3449,10.5,1970-01-01,A
您从 CSV 文件中删除了标题。
1) 创建外部表:
CREATE EXTERNAL TABLE IF NOT EXISTS Cars( Name STRING, Miles_per_Gallon INT, Cylinders INT, Displacement INT, Horsepower INT, Weight_in_lbs INT, Acceleration DECIMAL, Year DATE, Origin CHAR(1)) COMMENT 'Data about cars from a public database' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE location '/user/<username>/visdata';
2) 创建 Hive 管理的表以将数据转换为 ORC。
CREATE TABLE IF NOT EXISTS mycars( Name STRING, Miles_per_Gallon INT, Cylinders INT, Displacement INT, Horsepower INT, Weight_in_lbs INT, Acceleration DECIMAL, Year DATE, Origin CHAR(1)) COMMENT 'Data about cars from a public database' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS ORC;
3) 将外部表中的数据插入到 Hive 管理的表中。
INSERT OVERWRITE TABLE mycars SELECT * FROM cars;
4) 验证您是否将数据正确导入到 ORC 格式的表中:
hive> SELECT * FROM mycars LIMIT 3; OK "chevrolet chevelle malibu" 18 8 307 130 3504 12 1970-01-01 A "buick skylark 320" 15 8 350 165 3693 12 1970-01-01 A "plymouth satellite" 18 8 318 150 3436 11 1970-01-01 A Time taken: 0.144 seconds, Fetched: 3 row(s)
4.3 增量更新导入的表
在CDP Private Cloud Base 中,更新导入的表涉及使用 Apache Sqoop 导入对原始表所做的增量更改,然后将更改与导入到 Apache Hive 的表合并。
在将操作数据库中的数据摄取到 Hive 后,您通常需要设置一个流程来定期将导入的表与操作数据库表同步。基表是在第一次数据摄取期间创建的 Hive 管理的表。从操作数据库系统增量更新 Hive 表涉及合并基表和更改记录以反映最新的记录集。您将增量表创建为 Hive 外部表,通常来自 HDFS 中的 CSV 数据,以存储更改记录。此外部表包含自上次数据摄取以来操作数据库中的更改(插入和更新)。一般情况下,表是分区的,只更新最新的分区,使这个过程更有效率。
您可以使用 Oozie 自动执行以增量方式更新 Hive 中数据的步骤。
· 第一次将数据摄取到 hive 中时,您将整个基表以 ORC 格式存储在 Hive 中。
· 将其从外部表移动到 Hive 管理的表后的基表定义具有以下架构:
CREATE TABLE base_table ( id STRING, field1 STRING, modified_date DATE) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
1) 将增量表作为外部表存储在 Hive 中,并获取比 更新的记录last_import_date,这是上次增量数据更新的日期。
自上次更新数据以来,您经常导入增量更改,然后将其合并。
· 使用--check-column获取记录
· 用于--query获取记录
sqoop import --connect jdbc:teradata://{host name}/Database=retail --connection-manager org.apache.sqoop.teradata.TeradataConnManager --username dbc --password dbc --table SOURCE_TBL --target-dir /user/hive/incremental_table -m 1 --check-column modified_date --incremental lastmodified --last-value {last_import_date} sqoop import --connect jdbc:teradata://{host name}/Database=retail --connection-manager org.apache.sqoop.teradata.TeradataConnManager --username dbc --password dbc --target-dir /user/hive/incremental_table -m 1 --query 'select * from SOURCE_TBL where modified_date > {last_import_date} AND $CONDITIONS’
2) 使用 Sqoop 将增量表数据移入 HDFS 后,您可以使用以下命令在其上定义外部 Hive 表
CREATE EXTERNAL TABLE incremental_table ( id STRING, field1 STRING, modified_date DATE) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE location '/user/hive/incremental_table';
3) 使用 MERGE 命令合并数据并协调基表记录与新记录:
MERGE INTO base_table USING incremental_table ON base.id = incremental_table.id WHEN MATCHED THEN UPDATE SET fieldl1=incremental_table.email, modified_date=incremental_table.state WHEN NOT MATCHED THEN INSERT VALUES(incremental_table.id, incremental_table.field1, incremental_table.modified_data);