开发者社区> 问答> 正文

MaxCompute用户指南:MapReduce:概要:开源兼容MapReduce



MaxCompute(原 ODPS)有一套原生的 MapReduce 编程模型和接口,简单来说,这套接口的输入输出都是MaxCompute 中的 Table,处理的数据是以 Record 为组织形式的,它可以很好地描述 Table 中的数据处理过程。但是与社区的Hadoop 相比,编程接口差异较大。Hadoop 用户如果要将原来的 Hadoop MR 作业迁移到 MaxCompute 的 MR中执行,需要重写 MR 的代码,使用 MaxCompute 的接口进行编译和调试,运行正常后再打成一个 Jar 包才能放到 MaxCompute的平台来运行。这个过程十分繁琐,需要耗费很多的开发和测试人力。如果能够完全不改或者少量地修改原来的 Hadoop MapReduce代码,便可在 MaxCompute 平台上运行,将会比较理想。
现在 MaxCompute 平台提供了一个 Hadoop MapReduce 到 MaxCompute MR的适配工具,已经在一定程度上实现了 Hadoop MapReduce作业的二进制级别的兼容,即您可以在不改代码的情况下通过指定一些配置,便可将原来在 Hadoop 上运行的 MapReduce Jar包拿过来直接运行在 MaxCompute 上。您可通过 此处 下载开发插件。目前该插件处于测试阶段,暂时还不能支持您自定义 comparator 和自定义 key 类型。
下面将以 WordCount 程序为例,为您介绍 HadoopMR 插件的基本使用方式。


注意


下载 HadoopMR 插件


请单击 此处,下载插件,包名为 hadoop2openmr-1.0.jar。

注意
这个 Jar 包中已经包含 hadoop-2.7.2 版本的相关依赖,在作业的 Jar 包中请不要携带 Hadoop 的依赖,避免版本冲突。


准备 Jar 包


编译导出 WordCount 的 Jar 包:wordcount_test.jar,WordCount 程序的源码如下:
  1. package com.aliyun.odps.mapred.example.hadoop;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.Mapper;
  8. import org.apache.hadoop.mapreduce.Reducer;
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  10. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  11. import java.io.IOException;
  12. import java.util.StringTokenizer;
  13. public class WordCount {
  14.     public static class TokenizerMapper
  15.         extends Mapper<Object, Text, Text, IntWritable>{
  16.         private final static IntWritable one = new IntWritable(1);
  17.         private Text word = new Text();
  18.         public void map(Object key, Text value, Context context
  19.         ) throws IOException, InterruptedException {
  20.             StringTokenizer itr = new StringTokenizer(value.toString());
  21.             while (itr.hasMoreTokens()) {
  22.                 word.set(itr.nextToken());
  23.                 context.write(word, one);
  24.             }
  25.         }
  26.     }
  27.     public static class IntSumReducer
  28.         extends Reducer<Text,IntWritable,Text,IntWritable> {
  29.         private IntWritable result = new IntWritable();
  30.         public void reduce(Text key, Iterable<IntWritable> values,
  31.             Context context
  32.         ) throws IOException, InterruptedException {
  33.             int sum = 0;
  34.             for (IntWritable val : values) {
  35.                 sum += val.get();
  36.             }
  37.             result.set(sum);
  38.             context.write(key, result);
  39.         }
  40.     }
  41.     public static void main(String[] args) throws Exception {
  42.         Configuration conf = new Configuration();
  43.         Job job = Job.getInstance(conf, "word count");
  44.         job.setJarByClass(WordCount.class);
  45.         job.setMapperClass(TokenizerMapper.class);
  46.         job.setCombinerClass(IntSumReducer.class);
  47.         job.setReducerClass(IntSumReducer.class);
  48.         job.setOutputKeyClass(Text.class);
  49.         job.setOutputValueClass(IntWritable.class);
  50.         FileInputFormat.addInputPath(job, new Path(args[0]));
  51.         FileOutputFormat.setOutputPath(job, new Path(args[1]));
  52.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  53.     }
  54. }


准备测试数据


  1. 创建输入表和输出表。create table if not exists wc_in(line string);
  2. create table if not exists wc_out(key string, cnt bigint);

通过 Tunnel 命令将数据导入输入表中。
需要导入文本文件 data.txt 的数据内容如下:
  1. hello maxcompute
  2. hello mapreduce

您可通过 MaxCompute 客户端 的 Tunnel 命令将 data.txt 的数据导入 wc_in 中,如下所示:
  1. tunnel upload  data.txt wc_in;


准备好表与 HDFS 文件路径的映射关系配置


配置文件命名为:wordcount-table-res.conf
  1. {
  2.   "file:/foo": {
  3.     "resolver": {
  4.       "resolver": "com.aliyun.odps.mapred.hadoop2openmr.resolver.TextFileResolver",
  5.       "properties": {
  6.           "text.resolver.columns.combine.enable": "true",
  7.           "text.resolver.seperator": "\t"
  8.       }
  9.     },
  10.     "tableInfos": [
  11.       {
  12.         "tblName": "wc_in",
  13.         "partSpec": {},
  14.         "label": "__default__"
  15.       }
  16.     ],
  17.     "matchMode": "exact"
  18.   },
  19.   "file:/bar": {
  20.     "resolver": {
  21.       "resolver": "com.aliyun.odps.mapred.hadoop2openmr.resolver.BinaryFileResolver",
  22.       "properties": {
  23.           "binary.resolver.input.key.class" : "org.apache.hadoop.io.Text",
  24.           "binary.resolver.input.value.class" : "org.apache.hadoop.io.LongWritable"
  25.       }
  26.     },
  27.     "tableInfos": [
  28.       {
  29.         "tblName": "wc_out",
  30.         "partSpec": {},
  31.         "label": "__default__"
  32.       }
  33.     ],
  34.     "matchMode": "fuzzy"
  35.   }
  36. }

配置项说明:
整个配置是一个 Json 文件,描述 HDFS 上的文件与 MaxCompute 上的表之间的映射关系,一般要配置输入和输出两部分,一个 HDFS 路径对应一个 resolver 配置,tableInfos 配置以及 matchMode 配置。

  • resolver:用于配置如何对待文件中的数据,目前有com.aliyun.odps.mapred.hadoop2openmr.resolver.TextFileResolver 和com.aliyun.odps.mapred.hadoop2openmr.resolver.BinaryFileResolver 两个内置的resolver 可以选用。除了指定好 resolver 的名字,还需要为相应的 resolver 配置一些 properties指导它正确的进行数据解析。
    TextFileResolver:对于纯文本的数据,输入输出都会当成纯文本对待。当作为输入 resolver 配置时,需要配置的properties 有:text.resolver.columns.combine.enable 和text.resolver.seperator,当text.resolver.columns.combine.enable 配置为 true时,会把输入表的所有列按找 text.resolver.seperator指定的分隔符组合成一个字符串作为输入。否则,会把输入表的前两列分别作为 key,value。

  • BinaryFileResolver:可以处理二进制的数据,自动将数据转换为 MaxCompute可以支持的数据类型,如:Bigint,Bool,Double 等。当作为输出 resolver 配置时,需要配置的 properties有:binary.resolver.input.key.class 和binary.resolver.input.value.class,分别代表中间结果的 key 和 value 类型。

tableInfos:您配置 HDFS 对应的 MaxCompute 的表,目前只支持配置表的名字 tblName,而 partSpec 和 label 请保持和示例一致。
matchMode:路径的匹配模式,可选项为 exact 和 fuzzy,分别代表精确匹配和模糊匹配,如果设置为 fuzzy,则可以通过正则来匹配 HDFS 的输入路径。

作业提交


使用 MaxCompute 命令行工具 odpscmd 提交作业。MaxCompute 命令行工具的安装和配置方法请参见 客户端用户手册。在 odpscmd 下运行如下命令:
  1. jar -DODPS_HADOOPMR_TABLE_RES_CONF=./wordcount-table-res.conf -classpath hadoop2openmr-1.0.jar,wordcount_test.jar com.aliyun.odps.mapred.example.hadoop.WordCount /foo /bar;

这里假设已经将 hadoop2openmr-1.0.jar、wordcount_test.jar 和 wordcount-table-res.conf 放置到 odpscmd 的当前目录,否则在指定配置和 -classpath 的路径时需要做相应的修改。
运行过程如下图所示:

当作业运行完成后,便可查看结果表 wc_out 的内容,验证作业成功结束,结果符合预期。
  

展开
收起
行者武松 2017-10-23 17:38:41 2090 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Data+AI时代大数据平台应该如何建设 立即下载
大数据AI一体化的解读 立即下载
极氪大数据 Serverless 应用实践 立即下载