本文主要介绍如何使用Hive来处理保存在OSS上的数据源,并通过E-MapReduce计算,最终的结果保存在OSS上,并能够每天自动的进行Hive的分区数据的调度
处理条件:
数据源:我们假设在OSS上我们的数据是按照一定的目录格式来保存的,比如时间,按照类似2016/06/01这样的年/月/日的方式存放。而原始数据内容都是一些非格式化的数据,完全没有经过处理。
类似如下的一个格式:
123|service control exceed 100. others content|192.168.0.1|2016-05-31
结果数据:我们需要把每个目录下的数据经过处理,写到OSS上类似2016/06/01的一个结果目录下
处理过程:
创建元数据表
CREATE EXTERNAL TABLE logoss (logcontent string) partitioned by (year string, month string, day string) stored AS textfile location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path';
通过这一步,我们有了一张Hive的分区表,Hive只是在它的元数据库中记录了这个表的信息,这个时候还没有数据的处理。而数据也还在我们的OSS上躺着。
接着把需要的分区都加入到表中,这里我假设我们有很多个分区
ALTER TABLE logoss ADD PARTITION (year='2016', month='05', day='31') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/2016/05/31' PARTITION (year='2016', month='06', day='01') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/2016/06/01' PARTITION (year='2016', month='06', day='02') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/2016/06/02' PARTITION (year='2016', month='06', day='03') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/2016/06/03';
接下来我们select数据看一下,执行如下
select * from logoss limit 100;
我们就会看到我们的分区中的内容了。
处理原始数据
我们要把原来OSS上的原始数据,经过处理然后写到一个HDFS上的表,然后用这个HDFS的表进行后续的一系列处理。这里把所有的中间步骤都在HDFS上走,这样速度会快很多。
首先建立一个基于HDFS的Hive表,目前数据也还是空
CREATE TABLE loghdfs (id string, content string, ip string, oridate string) partitioned by (year string, month string, day string) stored AS textfile;
然后将OSS的数据进行处理并写入到HDFS的表中,这里我们使用IF NOT EXISTS,为了防止这个分区已经存在被我们覆盖掉,如果你希望数据直接覆盖,可以去掉这个条件判断。
INSERT OVERWRITE TABLE loghdfs PARTITION (year='2016', month='05', day='31') IF NOT EXISTS select split(logcontent,'\\|')[0] as id, split(logcontent,'\\|')[1] as content, split(logcontent,'\\|')[2] as ip, split(logcontent,'\\|')[3] as oridate FROM logoss;
业务处理
好了,到了这一步,我们就已经有了一个hdfs上的表了,我们可以对这个表进行任意的后续处理,
比如groupby 所有的ip,然后看他们的总数值
CREATE TABLE userip as select ip, count(id) from loghdfs group by ip;
中间可以进行类似的各种操作,由你的业务决定。
当所有的操作都完成以后,如果要把数据写到OSS上,那么来到最后一步
写回OSS
首先我们会创建一个对应OSS路径的Hive表,与第一步很类似
CREATE EXTERNAL TABLE resultoss (ip string, count int) partitioned by (year string, month string, day string) stored AS textfile location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path';
最后把我们的业务数据写入到对应的分区中去
INSERT OVERWRITE TABLE resultoss PARTITION (year='2016', month='05', day='31') IF NOT EXISTS select ip, count FROM userip;
这样我们的结果数据就写到了OSS上对应的目录下,类似这样的路径
/path/year=2016/month=05/day=31/
如何自动化
看了上面的这个过程,会发现这中间这个时间的分区需要我们手工写在里面,实在是太麻烦了,完全没有办法自动跑啊,那么下面我们就来更加进化一下。
job上配置自动时间
我们首先在E-MapReduce控制台上编辑的时候使用hivevar来指定时间变量,如下
-hivevar year='2016' -hivevar month='05' -hivevar day='31' -f ossref://mypath/job.hql
然后,我们需要把这个里面的常量变成每天自动变化的时间,我们使用E-MapReduce提供的时间变量
如下
-hivevar year=' ${yyyy-1d}' -hivevar month=' ${MM-1d}' -hivevar day=' ${dd-1d}' -f ossref://mypath/job.hql
时间配置的说明请参考这里
完整的作业配置及代码
现在我们看看修改完成以后的完整的代码,中间的分区时间都是用变量进行了替换
CREATE EXTERNAL TABLE logoss (logcontent string) partitioned by (year string, month string, day string) stored AS textfile location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/';
ALTER TABLE logoss ADD PARTITION (year='${hivevar:year}', month='${hivevar:month}', day='${hivevar:day}') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/${hivevar:year}/${hivevar:month}/${hivevar:day}';
CREATE TABLE loghdfs (id string, content string, ip string, oridate string) partitioned by (year string, month string, day string) stored AS textfile;
INSERT OVERWRITE TABLE loghdfs PARTITION (year='${hivevar:year}', month='${hivevar:month}', day='${hivevar:day}') IF NOT EXISTS select split(logcontent,'\\|')[0] as id, split(logcontent,'\\|')[1] as content, split(logcontent,'\\|')[2] as ip, split(logcontent,'\\|')[3] as oridate FROM logoss;
CREATE TABLE userip as select ip, count(id) as count from loghdfs group by ip;
CREATE EXTERNAL TABLE resultoss (ip string, count int) partitioned by (year string, month string, day string) stored AS textfile location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/outpath/';
INSERT OVERWRITE TABLE resultoss PARTITION (year='${hivevar:year}', month='${hivevar:month}', day='${hivevar:day}') IF NOT EXISTS select ip, count FROM userip;
然后你可以把这个作业加到一个周期执行的执行计划中,每天运行一次,就可以完全的自动每天跑数据啦。