我们可以有很多方式可以把数据导入到hbase当中,比如说用map-reduce,使用TableOutputFormat这个类,但是这种方式不是最优的方式。
Bulk的方式直接生成HFiles,写入到文件系统当中,这种方式的效率很高。
一般的步骤有两步:
(1)使用ImportTsv或者import工具或者自己写程序用hive/pig生成HFiles
(2)用completebulkload把HFiles加载到hdfs上
ImportTsv能把用Tab分隔的数据很方便的导入到hbase当中,但还有很多数据不是用Tab分隔的 下面我们介绍如何使用hive来导入数据到hbase当中。
1.准备输入内容
a.创建一个tables.ddl文件
-- pagecounts data comes from http://dumps.wikimedia.org/other/
pagecounts-raw/
-- documented http://www.mediawiki.org/wiki/Analytics/Wikistats
-- define an external table over raw pagecounts data
CREATE TABLE IF NOT EXISTS pagecounts (projectcode STRING, pagename
STRING, pageviews STRING, bytes STRING)
ROW FORMAT
DELIMITED FIELDS TERMINATED BY ' '
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '/tmp/wikistats';
-- create a view, building a custom hbase rowkey
CREATE VIEW IF NOT EXISTS pgc (rowkey, pageviews, bytes) AS
SELECT concat_ws('/',
projectcode,
concat_ws('/',
pagename,
regexp_extract(INPUT__FILE__NAME, 'pagecounts-(\\d{8}-\\d{6})\
\..*$', 1))),
pageviews, bytes
FROM pagecounts;
-- create a table to hold the input split partitions
CREATE EXTERNAL TABLE IF NOT EXISTS hbase_splits(partition STRING)
ROW FORMAT
SERDE 'org.apache.hadoop.hive.serde2.binarysortable.
BinarySortableSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.
HiveNullValueSequenceFileOutputFormat'
LOCATION '/tmp/hbase_splits_out';
-- create a location to store the resulting HFiles
CREATE TABLE hbase_hfiles(rowkey STRING, pageviews STRING, bytes STRING)
STORED AS
INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileOutputFormat'
TBLPROPERTIES('hfile.family.path' = '/tmp/hbase_hfiles/w');
b.创建HFils分隔文件,例子:sample.hql
-- prepate range partitioning of hfiles
ADD JAR /usr/lib/hive/lib/hive-contrib-0.11.0.1.3.0.0-104.jar;
SET mapred.reduce.tasks=1;
CREATE TEMPORARY FUNCTION row_seq AS 'org.apache.hadoop.hive.contrib.udf.
UDFRowSequence';
-- input file contains ~4mm records. Sample it so as to produce 5 input
splits.
INSERT OVERWRITE TABLE hbase_splits
SELECT rowkey FROM
(SELECT rowkey, row_seq() AS seq FROM pgc
TABLESAMPLE(BUCKET 1 OUT OF 10000 ON rowkey) s
ORDER BY rowkey
LIMIT 400) x
WHERE (seq % 100) = 0
ORDER BY rowkey
LIMIT 4;
-- after this is finished, combined the splits file:
dfs -cp /tmp/hbase_splits_out/* /tmp/hbase_splits;
c.创建hfiles.hql
ADD JAR /usr/lib/hbase/hbase-0.94.6.1.3.0.0-104-security.jar;
ADD JAR /usr/lib/hive/lib/hive-hbase-handler-0.11.0.1.3.0.0-104.jar;
SET mapred.reduce.tasks=5;
SET hive.mapred.partitioner=org.apache.hadoop.mapred.lib.
TotalOrderPartitioner;
SET total.order.partitioner.path=/tmp/hbase_splits;
-- generate hfiles using the splits ranges
INSERT OVERWRITE TABLE hbase_hfiles
SELECT * FROM pgc
CLUSTER BY rowkey;
2.导入数据
注意:/$Path_to_Input_Files_on_Hive_Client是hive客户端的数据存储目录
mkdir /$Path_to_Input_Files_on_Hive_Client/wikistats
wget http://dumps.wikimedia.org/other/pagecounts-raw/2008/2008-10/
pagecounts-20081001-000000.gz
hadoop fs -mkdir /$Path_to_Input_Files_on_Hive_Client/wikistats
hadoop fs -put pagecounts-20081001-000000.
gz /$Path_to_Input_Files_on_Hive_Client/wikistats/
3.创建必要的表
注意:$HCATALOG_USER是HCatalog服务的用户(默认是hcat)
$HCATALOG_USER-f /$Path_to_Input_Files_on_Hive_Client/tables.ddl
执行之后,我们会看到如下的提示:
OK
Time taken: 1.886 seconds
OK
Time taken: 0.654 seconds
OK
Time taken: 0.047 seconds
OK
Time taken: 0.115 seconds
4.确认表已经正确创建
执行以下语句
$HIVE_USER-e "select * from pagecounts limit 10;"
执行之后,我们会看到如下的提示:
...
OK
aa Main_Page 4 41431
aa Special:ListUsers 1 5555
aa Special:Listusers 1 1052
再执行
$HIVE_USER-e "select * from pgc limit 10;"
执行之后,我们会看到如下的提示:
...
OK
aa/Main_Page/20081001-000000 4 41431
aa/Special:ListUsers/20081001-000000 1 5555
aa/Special:Listusers/20081001-000000 1 1052
...
5.生成HFiles分隔文件
$HIVE_USER-f /$Path_to_Input_Files_on_Hive_Client/sample.hql
hadoop fs -ls /$Path_to_Input_Files_on_Hive_Client/hbase_splits
为了确认,执行以下命令
hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.2.0.1.
3.0.0-104.jar -libjars /usr/lib/hive/lib/hive-exec-0.11.0.1.3.0.0-104.
jar -input /tmp/hbase_splits -output /tmp/hbase_splits_txt -inputformat
SequenceFileAsTextInputFormat
执行之后,我们会看到如下的提示:
...
INFO streaming.StreamJob: Output: /tmp/hbase_splits_txt
再执行这一句
hadoop fs -cat /tmp/hbase_splits_txt/*
执行之后,我们会看到类似这样的结果
61 66 2e 71 2f 4d 61 69 6e 5f 50 61 67 65 2f 32 30 30 38 31 30 30 31 2d 30
30 30 30 30 00 (null)
61 66 2f 31 35 35 30 2f 32 30 30 38 31 30 30 31 2d 30 30 30 30 30 30 00
(null)
61 66 2f 32 38 5f 4d 61 61 72 74 2f 32 30 30 38 31 30 30 31 2d 30 30 30
30 30 00 (null)
61 66 2f 42 65 65 6c 64 3a 31 30 30 5f 31 38 33 30 2e 4a 50 47 2f 32 30
38 31 30 30 31 2d 30 30 30 30 30 30 00 (null)
6.生成HFiles
HADOOP_CLASSPATH=/usr/lib/hbase/hbase-0.94.6.1.3.0.0-104-security.jar
hive -f /$Path_to_Input_Files_on_Hive_Client/hfiles.hql
以上内容是hdp的用户手册中推荐的方式,然后我顺便也从网上把最后的一步的命令格式给找出来了。
hadoop jar hbase-VERSION.jar completebulkload /user/todd/myoutput mytable