Apache Crunch:简化编写MapReduce Pipeline程序

简介:

Apache Crunch提供了一套Java API,能够简化编写、测试、运行MapReduce Pipeline程序。Crunch的基本思想是隐藏编写MapReduce程序的细节,基于函数式编程的思想,定义了一套函数式编程接口,因为Java并不支持函数式编程,只能通过回调的方式来实现,虽然写起来代码不够美观简洁,但是编写MapReduce程序的思路是非常清晰的,而且比编写原生的MapReduce程序要容易地多。如果直接使用MapReduce API编写一个复杂的Pipeline程序,可能需要考虑好每个Job的细节(Map和Reduce的实现内容),而使用Crunch变成库来编写,只需要清晰地控制好要实现的业务逻辑处理的操作流程,调用Crunch提供的接口(类似函数操作的算子、如union、join、filter、groupBy、sort等等)。
下面,我们简单说明一下Crunch提供的一些功能或内容:

  • Crunch集合及操作

我们看一下Crunch提供的用来在处理分布式数据集的集合类型的抽象定义,如下面类图所示:

上面,我给出了集合类对应的方法签名,其中具有相同名称签名的方法还具有重载的其他方法签名(参数列表不同),Crunch集合类型的高层抽象就包含这3个接口,相关集合子类的实现可以参考Crunch源码。

  • 连接操作(Join)

上面类图中,PTable接口中有个一个join方法,这个默认是实现内连接功能(INNER JOIN),Crunch还提供了一个实现各种join操作的工具类,这个类中包含了join相关的静态方法,如下所示:

1 public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right)
2 public static <K, U, V> PTable<K, Pair<U, V>> innerJoin(PTable<K, U> left, PTable<K, V> right)
3 public static <K, U, V> PTable<K, Pair<U, V>> leftJoin(PTable<K, U> left, PTable<K, V> right)
4 public static <K, U, V> PTable<K, Pair<U, V>> rightJoin(PTable<K, U> left, PTable<K, V> right)
5 public static <K, U, V> PTable<K, Pair<U, V>> fullJoin(PTable<K, U> left, PTable<K, V> right)
  • 排序(Sorting)

排序操作是通过Sort工具来提供的,提供了包含sort相关的静态方法,如下所示:

01 public static <T> PCollection<T> sort(PCollection<T> collection)
02 public static <T> PCollection<T> sort(PCollection<T> collection, Order order)
03 public static <T> PCollection<T> sort(PCollection<T> collection, int numReducers, Order order)
04 public static <K, V> PTable<K, V> sort(PTable<K, V> table)
05 public static <K, V> PTable<K, V> sort(PTable<K, V> table, Order key)
06 public static <K, V> PTable<K, V> sort(PTable<K, V> table, int numReducers, Order key)
07 public static <U, V> PCollection<Pair<U, V>> sortPairs(PCollection<Pair<U, V>> collection, ColumnOrder... columnOrders)
08 public static <V1, V2, V3> PCollection<Tuple3<V1, V2, V3>> sortTriples(PCollection<Tuple3<V1, V2, V3>> collection, ColumnOrder... columnOrders)
09 public static <V1, V2, V3, V4> PCollection<Tuple4<V1, V2, V3, V4>> sortQuads(PCollection<Tuple4<V1, V2, V3, V4>> collection, ColumnOrder... columnOrders)
10 public static <V1, V2, V3, V4> PCollection<Tuple4<V1, V2, V3, V4>> sortQuads(Collection<Tuple4<V1, V2, V3, V4>> collection, ColumnOrder... columnOrders)
11 public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection, ColumnOrder... columnOrders)
12 public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection,int numReducers, ColumnOrder... columnOrders)

可见排序的操作是非常方便的,后面我们会有个例子使用排序的功能。

  • 次级排序(SecondarySort)

次级排序是通过SecondarySort工具类来实现的,也是包含了一组静态方法,如下所示:

1 public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype)
2 public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype, int numReducers)
3 public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype)
4 public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype,int numReducers)
  • 去重(Distinct)

工具类Distinct提供的静态方法,如下所示:

1 public static <S> PCollection<S> distinct(PCollection<S> input)
2 public static <K, V> PTable<K, V> distinct(PTable<K, V> input)
3 public static <S> PCollection<S> distinct(PCollection<S> input, int flushEvery)
4 public static <K, V> PTable<K, V> distinct(PTable<K, V> input, int flushEvery)
  • 采样(Sampling)

工具类Sample提供了一组静态方法,如下所示:

01 public static <S> PCollection<S> sample(PCollection<S> input, double probability)
02 public static <S> PCollection<S> sample(PCollection<S> input, Long seed, doubleprobability)
03 public static <K, V> PTable<K, V> sample(PTable<K, V> input, double probability)
04 public static <K, V> PTable<K, V> sample(PTable<K, V> input, Long seed, doubleprobability)
05 public static <T> PCollection<T> reservoirSample(PCollection<T> input, int sampleSize)
06 public static <T> PCollection<T> reservoirSample(PCollection<T> input, int sampleSize, Long seed)
07 public static <T, N extends Number> PCollection<T> weightedReservoirSample(PCollection<Pair<T, N>> input, int sampleSize)
08 public static <T, N extends Number> PCollection<T> weightedReservoirSample(PCollection<Pair<T, N>> input, int sampleSize, Long seed)
09 public static <T, N extends Number> PCollection<Pair<Integer, T>> groupedWeightedReservoirSample( PTable<Integer, Pair<T, N>> input, int[] sampleSizes)
10 public static <T, N extends Number> PCollection<Pair<Integer, T>> groupedWeightedReservoirSample(PTable<Integer, Pair<T, N>> input, int[] sampleSizes, Long seed)
  • 运行原生MapReduce程序

通过Mapreduce和Mapred两个工具类,可以运行我们已经存在的MapReduce程序,静态方法如下所示:

01 // Mapreduce工具类
02 public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> map(
03       PTable<K1, V1> input,
04       Class<? extends Mapper<K1, V1, K2, V2>> mapperClass,
05       Class<K2> keyClass, Class<V2> valueClass)
06 public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> reduce(
07       PGroupedTable<K1, V1> input,
08       Class<? extends Reducer<K1, V1, K2, V2>> reducerClass,
09           Class<K2> keyClass, Class<V2> valueClass)
10  
11 // Mapred工具类
12 public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> map(
13       PTable<K1, V1> input,
14       Class<? extends Mapper<K1, V1, K2, V2>> mapperClass,
15       Class<K2> keyClass, Class<V2> valueClass)
16 public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> reduce(
17       PGroupedTable<K1, V1> input,
18       Class<? extends Reducer<K1, V1, K2, V2>> reducerClass,
19           Class<K2> keyClass, Class<V2> valueClass)

下面,我们通过实现两个例子,来体验使用Crunch和原生MapReduce API编程的不同。

WordCount计算词频程序

我们要实现的WordCount程序,基于Crunch编程库,按照操作来定义分为如下步骤:

  1. 从HDFS上读取一个保存Text文件的目录
  2. 将文件中每行内容根据空格分隔,分成<word, 单个单词词频>对(MapReduce程序的Map阶段)
  3. 将得到的集合按照key分组
  4. 化简结果得到每个单词的频率计数<word, 全局词频>(MapReduce程序的Reduce阶段)
  5. 根据单词全局词频计数,得到降序排序的结果集
  6. 输出结果到HDFS中

基于上述步骤,根据Crunch编程库实现代码,如下所示:

001 package org.shirdrn.crunch.examples;
002  
003 import static org.apache.crunch.types.writable.Writables.ints;
004 import static org.apache.crunch.types.writable.Writables.strings;
005 import static org.apache.crunch.types.writable.Writables.tableOf;
006  
007 import java.io.Serializable;
008 import java.util.Iterator;
009  
010 import org.apache.crunch.CombineFn;
011 import org.apache.crunch.DoFn;
012 import org.apache.crunch.Emitter;
013 import org.apache.crunch.PCollection;
014 import org.apache.crunch.PGroupedTable;
015 import org.apache.crunch.PTable;
016 import org.apache.crunch.Pair;
017 import org.apache.crunch.Pipeline;
018 import org.apache.crunch.PipelineResult;
019 import org.apache.crunch.impl.mr.MRPipeline;
020 import org.apache.crunch.lib.Sort;
021 import org.apache.crunch.lib.Sort.ColumnOrder;
022 import org.apache.hadoop.conf.Configuration;
023 import org.apache.hadoop.conf.Configured;
024 import org.apache.hadoop.util.Tool;
025 import org.apache.hadoop.util.ToolRunner;
026  
027 import com.google.common.base.Strings;
028  
029 public class WordCount extends Configured implements Tool, Serializable {
030  
031      private static final long serialVersionUID = 1L;
032  
033      @Override
034      public int run(String[] args) throws Exception {
035           if(args.length != 2) {
036                System.err.println("Usage: hadoop jar crunch-0.0.1-SNAPSHOT" +
037                          WordCount.class.getName() + " <input> <output>");
038                return 1;
039           }
040  
041           String inputPath = args[0];
042           String outputPath = args[1];
043  
044           // Create an pipeline & read a text file
045           Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
046           PCollection<String> lines = pipeline.readTextFile(inputPath);
047  
048           // map
049           PTable<String, Integer> mappedWords = map(lines);
050           
051           // group by key
052           PGroupedTable<String, Integer> groupedWords = mappedWords.groupByKey();
053           
054           // reduce
055           PTable<String, Integer> reducedWords = reduce(groupedWords);
056           
057           // sort
058           PCollection<Pair<String, Integer>> sortedValues =
059                     Sort.sortPairs(reducedWords, ColumnOrder.by(2, Sort.Order.DESCENDING));
060  
061           // write the result to a text file
062           pipeline.writeTextFile(sortedValues, outputPath);
063  
064           // Execute the pipeline as a MapReduce
065           PipelineResult result = pipeline.done();
066           return result.succeeded() ? 0 : 1;
067      }
068      
069      private final PTable<String, Integer> map(PCollection<String> lines) {
070           PTable<String, Integer> mappedWords = lines.parallelDo(new DoFn<String, Pair<String, Integer>>() {
071                private static final long serialVersionUID = 1L;
072                private static final String PATTERN = "\\s+";
073                @Override
074                public void process(String input, Emitter<Pair<String, Integer>> emitter) {
075                     if(!Strings.isNullOrEmpty(input)) {
076                          for(String word : input.split(PATTERN)) {
077                               if(!Strings.isNullOrEmpty(word)) {
078                                    emitter.emit(Pair.of(word, 1));
079                               }
080                          }
081                     }                  
082                }
083           }, tableOf(strings(), ints()));
084           return mappedWords;
085      }
086  
087      private final PTable<String, Integer> reduce(PGroupedTable<String, Integer> groupedWords) {
088           PTable<String, Integer> reducedWords = groupedWords.combineValues(newCombineFn<String, Integer>() {
089                private static final long serialVersionUID = 1L;
090                @Override
091                public void process(Pair<String, Iterable<Integer>> values, Emitter<Pair<String, Integer>> emitter) {
092                     int count = 0;
093                     Iterator<Integer> iter = values.second().iterator();
094                     while(iter.hasNext()) {
095                          count += iter.next();
096                     }
097                     emitter.emit(Pair.of(values.first(), count));
098                }
099           });
100           return reducedWords;
101      }
102  
103      public static void main(String[] args) throws Exception {
104           ToolRunner.run(new Configuration(), new WordCount(), args);
105      }
106  
107 }

上述代码中,可以在run方法中看到在完成一个计算任务的过程中,有序步骤序列中的每一步需要做什么都非常明确,而如果使用MapReduce API编写,我们可能会完全陷入API的使用方法中,而对整体执行流程没有一个更加直观的印象,尤其是对接触MapReduce不久的开发人员来说,更是难以理解。
运行程序和使用MapReduce API编写的程序使用相同的命令格式,下面运行我们基于Crunch编写的程序,执行如下命令:

1 hadoop jar crunch-0.0.1-SNAPSHOT.jar org.shirdrn.crunch.examples.WordCount /data/crunch/in /data/crunch/out

控制台运行结果信息,示例如下所示:

01 15/03/06 15:48:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
02 15/03/06 15:48:45 INFO impl.FileTargetImpl: Will write output files to new path: /data/crunch/out
03 15/03/06 15:48:45 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
04 15/03/06 15:48:46 INFO collect.PGroupedTableImpl: Setting num reduce tasks to 2
05 15/03/06 15:48:46 INFO client.RMProxy: Connecting to ResourceManager at h1/192.168.4.142:8032
06 15/03/06 15:48:47 INFO Configuration.deprecation: dfs.block.size is deprecated. Instead, use dfs.blocksize
07 15/03/06 15:48:47 INFO input.FileInputFormat: Total input paths to process : 1
08 15/03/06 15:48:47 INFO input.CombineFileInputFormat: DEBUG: Terminated node allocation with : CompletedNodes: 2, size left: 51480009
09 15/03/06 15:48:47 INFO mapreduce.JobSubmitter: number of splits:27
10 15/03/06 15:48:47 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1422867923292_0012
11 15/03/06 15:48:48 INFO impl.YarnClientImpl: Submitted application application_1422867923292_0012
12 15/03/06 15:48:48 INFO mapreduce.Job: The url to track the job:http://h1:8088/proxy/application_1422867923292_0012/
13 15/03/06 15:48:48 INFO jobcontrol.CrunchControlledJob: Running job "org.shirdrn.crunch.examples.WordCount: Text(/data/crunch/in)+S0+GBK+combine+S1+SeqFile(/tmp/crun... ID=1 (1/2)"
14 15/03/06 15:48:48 INFO jobcontrol.CrunchControlledJob: Job status available at:http://h1:8088/proxy/application_1422867923292_0012/
15 15/03/06 15:54:57 INFO client.RMProxy: Connecting to ResourceManager at h1/192.168.4.142:8032
16 15/03/06 15:54:58 INFO input.FileInputFormat: Total input paths to process : 2
17 15/03/06 15:54:58 INFO mapreduce.JobSubmitter: number of splits:24
18 15/03/06 15:54:58 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1422867923292_0013
19 15/03/06 15:54:58 INFO impl.YarnClientImpl: Submitted application application_1422867923292_0013
20 15/03/06 15:54:58 INFO mapreduce.Job: The url to track the job:http://h1:8088/proxy/application_1422867923292_0013/
21 15/03/06 15:54:58 INFO jobcontrol.CrunchControlledJob: Running job "org.shirdrn.crunch.examples.WordCount: SeqFile(/tmp/crunch-1592377036/p1)+GBK+ungroup+PTables.va... ID=2 (2/2)"
22 15/03/06 15:54:58 INFO jobcontrol.CrunchControlledJob: Job status available at:http://h1:8088/proxy/application_1422867923292_0013/

上面,我们基于Crunch编程库实现的WordCount程序,最终在Hadoop集群上运行时,被转换成2个Job(application_1422867923292_0012和application_1422867923292_0013),也可以通过YARN Application页面,如图所示:


运行成功后,可以通过命令查看程序执行结果,内容如下所示:

01 [hadoop@h1 ~]$ hdfs dfs -cat /data/crunch/out/part-r-00000 | head
02 15/03/06 16:24:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
03 [0,31498202]
04 [2.3.14,6696801]
05 [1,4099030]
06 [2.3.3,3364573]
07 [2.2.1,3204483]
08 [appstore,2896280]
09 [2.1.1,2664336]
10 [3.0.3,2246132]
11 [2.3.9,1669198]
12 [3.0.5,1596954]

可见,程序运行结果符合我们设计程序的预期。

表join操作程序

使用MapReduce API编写join程序时,非常复杂,尤其是如果对MapReduce执行的原理理解不是很深刻时,实现程序可能会有一点困难,而且完成后可能也不是很直观。下面,我们使用Crunch API来实现一个表join的例子,两个表做连接,首先准备的数据文件如下所示:

  • 用户信息表

用户信息表对应的文件为user_info.txt,这是来自手机应用程序的,包含两个字段,第一个是用户编号,第二个是做推广的渠道来源,中间使用TAB键分隔,文件行内容,示例如下所示:

01 86756ed5cccd69494f6b94fc31638835        c9049310
02 c4b0657cf93fc4f05c87a76077e50090        c0000341
03 d5c33ce0a110ca3e2def912506bcd846        c9049310
04 7ed745aa273b67b7e9189256140c7220        c0087001
05 d31908a762c49c68826f9f5addb345cb        c0000106
06 dd3ced7cf15ba397f84eb3389b6ffc11        c0049146
07 c60bc23264e61e3ba553f6127138c77a        c6201204
08 a892a2001947d872435edddad44da7d7        c9049310
09 d31908a762c49c68826f9f5addb345cb        c9001500
10 dd3ced7cf15ba397f84eb3389b6ffc11        c6820012
11 c60bc23264e61e3ba553f6127138c77a        c0055102
12 a892a2001947d872435edddad44da7d7        c9049310
13 847383774f24b1904a9ada6af7028f52        c0049146
14 443f9f5c9f8c16a8a676bb567451a65f        c6820012
15 a6b88a8ad016af214ba30aef03b450eb        c0055102
16 f57cab9d9712e9884a5f4a373af8b220        c0055102
17 3762865e195e7885e41fe748671657c2        c0049146
18 34323bfbfe3ea8561952769af0bc117f        c9000541
19 9b4e112db554d44ae5c8140a40f7d389        c0049146
20 f26fd561ab357cf2aad041eaa17c30b4        c0055102
21 2812d7c1641580784b19f7ef1b1fdbf2        c6820012
22 7ed745aa273b67b7e9189256140c7220        c9049310
23 19bacba14597d2e8994ce50a5d47c2b4        c0055102
24 B9BEDD6A957377FDEA2A78D4B614531A        c9000201
25 7aee01193033491f12342a7b9dbbe4a3        c0000106
26 77b80cfc95b04ba427412ae5fc276d6f        c0055102
27 e1f7c264271e1ee2e2a78d0798354d89        c6820012
28 68aeafdfd07e85b41faafb33fc63600c        c0055102
29 556BB4023B23D4C8DD70E81FDD270121        c0049146
30 d31908a762c49c68826f9f5addb345cb        c0055102
31 14b769541197dc391767ddbce878393b        c0055102
32 c60bc23264e61e3ba553f6127138c77a        c0000106
33 2708bb4bb8f967c607bc82749d892ebf        c0055102
34 70F9442DAE5539ED99E280F12AFA4EC1        c9049310
  • 渠道信息表

渠道信息表对应的文件为channel_info.txt,文件行内容包含两个字段,第一个是渠道编号,第二个是渠道名称,中间使用TAB键分隔,示例文件行内容如下所示:

01 c0000341        Anet
02 c9049310        Bwater
03 c0087001        cks005
04 c0000106        bjrow
05 c6201204        nabEer
06 c9001500        moppppy
07 c0055102        susu
08 c0049146        pwest
09 c6820012        9square
10 c9000541        house8
11 c9000201        netfyy

由于用户表只保存了渠道编号,在出报表的时候需要将渠道名称显示出来,我们需要将两个表基于渠道编号做一个连接,就能在报表中展示用户所在的渠道名称。
使用Crunch实现这个需求时,非常简单明了,代码如下所示:

01 package org.shirdrn.crunch.examples;
02  
03 import java.io.Serializable;
04  
05 import org.apache.crunch.DoFn;
06 import org.apache.crunch.Emitter;
07 import org.apache.crunch.PCollection;
08 import org.apache.crunch.PTable;
09 import org.apache.crunch.Pair;
10 import org.apache.crunch.Pipeline;
11 import org.apache.crunch.PipelineResult;
12 import org.apache.crunch.impl.mr.MRPipeline;
13 import org.apache.crunch.lib.Join;
14 import org.apache.crunch.types.PTypeFamily;
15 import org.apache.hadoop.conf.Configuration;
16 import org.apache.hadoop.conf.Configured;
17 import org.apache.hadoop.util.Tool;
18 import org.apache.hadoop.util.ToolRunner;
19  
20 public class JoinUserChannel extends Configured implements Tool, Serializable {
21  
22      private static final long serialVersionUID = 1L;
23  
24      @Override
25      public int run(String[] args) throws Exception {
26           if(args.length != 3) {
27                System.err.println("Usage: hadoop jar crunch-0.0.1-SNAPSHOT" +
28                          JoinUserChannel.class.getName() + " <user_input> <channel_input> <output>");
29                return 1;
30           }
31  
32           String userPath = args[0];
33           String channelPath = args[1];
34           String outputPath = args[2];
35  
36           // Create an pipeline & read 2 text files
37           Pipeline pipeline = new MRPipeline(JoinUserChannel.class, getConf());
38           
39           // user data
40           PCollection<String> users = pipeline.readTextFile(userPath);
41           PTypeFamily uTF = users.getTypeFamily();
42           PTable<String, String> left = users.parallelDo(new DoFn<String, Pair<String, String>>() {
43                private static final long serialVersionUID = 1L;
44                @Override
45                public void process(String input, Emitter<Pair<String, String>> emitter) {
46                     String[] kv = input.split("\\s+");
47                     if(kv.length == 2) {
48                          String userId = kv[0];
49                          String channelId = kv[1].trim();
50                          emitter.emit(Pair.of(channelId, userId)); // key=channelId, value=userId
51                     }
52                }
53                
54           }, uTF.tableOf(uTF.strings(), uTF.strings()));
55           
56           // channel data
57           PCollection<String> channels = pipeline.readTextFile(channelPath);
58           PTypeFamily cTF = channels.getTypeFamily();
59           PTable<String, String> right = channels.parallelDo(new DoFn<String, Pair<String, String>>() {
60                private static final long serialVersionUID = 1L;
61                @Override
62                public void process(String input, Emitter<Pair<String, String>> emitter) {
63                     String[] kv = input.split("\\s+");
64                     if(kv.length == 2) {
65                          String channelId = kv[0].trim();
66                          String channelName = kv[1];
67                          emitter.emit(Pair.of(channelId, channelName)); // key=channelId, value=channelName
68                     }
69                }
70                
71           }, cTF.tableOf(cTF.strings(), cTF.strings()));
72           
73           // join 2 tables & write to HDFS
74           PTable<String, Pair<String, String>> joinedResult = Join.innerJoin(left, right);
75           pipeline.writeTextFile(joinedResult, outputPath);
76           
77           // Execute the pipeline as a MapReduce.
78           PipelineResult result = pipeline.done();
79           return result.succeeded() ? 0 : 1;
80      }
81      
82      public static void main(String[] args) throws Exception {
83           ToolRunner.run(new Configuration(), new JoinUserChannel(), args);
84           
85      }
86  
87 }

首先分别将两个文件的数据读取到PCollection<String>集合对象中,返回PTable<String, String>集合对象,然后通过Join工具类调用innerJoin方法实现内连接操作,返回PTable<String, Pair<String, String>>,最后写入HDFS文件系统中。进行表join操作时,如果是内连接,也可以直接调用PTable<String, String>集合的join方法,与调用Join工具类的innerJoin和join方法功能相同。
运行上述程序,执行如下命令:

1 hadoop jar crunch-0.0.1-SNAPSHOT.jar org.shirdrn.crunch.examples.JoinUserChannel /data/crunch/user/user_info.txt /data/crunch/user/channel_info.txt /data/crunch/user/joined

查看运行结果,如下所示:

01 [hadoop@h1 crunch]$ hdfs dfs -ls /data/crunch/user/joined
02 15/03/06 18:53:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
03 Found 2 items
04 -rw-r--r--   3 hadoop supergroup          0 2015-03-06 18:53 /data/crunch/user/joined/_SUCCESS
05 -rw-r--r--   3 hadoop supergroup       1777 2015-03-06 18:53 /data/crunch/user/joined/part-r-00000
06 [hadoop@h1 crunch]$ hdfs dfs -cat /data/crunch/user/joined/part-r-00000 | head
07 15/03/06 18:54:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
08 [c0000106,[7aee01193033491f12342a7b9dbbe4a3,bjrow]]
09 [c0000106,[c60bc23264e61e3ba553f6127138c77a,bjrow]]
10 [c0000106,[d31908a762c49c68826f9f5addb345cb,bjrow]]
11 [c0000341,[c4b0657cf93fc4f05c87a76077e50090,Anet]]
12 [c0049146,[3762865e195e7885e41fe748671657c2,pwest]]
13 [c0049146,[556BB4023B23D4C8DD70E81FDD270121,pwest]]
14 [c0049146,[9b4e112db554d44ae5c8140a40f7d389,pwest]]
15 [c0049146,[dd3ced7cf15ba397f84eb3389b6ffc11,pwest]]
16 [c0049146,[847383774f24b1904a9ada6af7028f52,pwest]]
17 [c0055102,[f26fd561ab357cf2aad041eaa17c30b4,susu]]

总结说明

上面的两个例子只是简单使用Crunch开发了两MapReduce程序,无论对有经验的开发人员还是新手,都很容易上手开发。
目前,Crunch还处在项目初期,最新版本是0.11,当前只在Hadoop的2.2.0以及以下几个平台下测试过,如果在高于2.2.0的版本的平台上运行,可能会有若干一些小问题或不完善之处。上面我们开发的例子程序,就是运行在Hadoop 2.6.0平台上,比如在读取提交程序的节点上的Hadoop配置文件中block size配置时,不支持高版本的类似64m、2g等的配置内容,而必须使用数字类型表示的字节数配置。
Crunch后续开发迭代中一定会使各项功能更加完善,API也更加简洁,尤其是在程序运行调优方面,Crunch的目标是做到几乎不需要进行复杂的调优配置就能够使程序高效地运行,非常期待。
另外,Crunch也提供了对Spark计算平台的支持,想要了解与Spark计算相关的内容,可以参考官网文档。

目录
相关文章
|
4月前
|
SQL 分布式计算 数据处理
|
4月前
|
分布式计算 资源调度 监控
MapReduce程序中的主要配置参数详解
【8月更文挑战第31天】
130 0
|
6月前
|
分布式计算 Java Hadoop
简单的java Hadoop MapReduce程序(计算平均成绩)从打包到提交及运行
简单的java Hadoop MapReduce程序(计算平均成绩)从打包到提交及运行
61 0
|
6月前
|
分布式计算 数据挖掘
通过mapreduce程序统计旅游订单(wordcount升级版)
通过mapreduce程序统计旅游订单(wordcount升级版)
|
Java
【Java异常】Error:(30, 62) java: 程序包com.sun.org.apache.xerces.internal.impl.dv.util不存在
【Java异常】Error:(30, 62) java: 程序包com.sun.org.apache.xerces.internal.impl.dv.util不存在
1313 0
|
分布式计算 资源调度 Ubuntu
MapReduce程序运行部署的几种方式 - 结尾附源码
MapReduce程序运行部署的几种方式 - 结尾附源码
225 0
|
7月前
|
分布式计算 Hadoop Java
【集群模式】执行MapReduce程序-wordcount
【集群模式】执行MapReduce程序-wordcount
|
7月前
|
分布式计算 Java Hadoop
IDEA 打包MapReduce程序到集群运行的两种方式以及XShell和Xftp过期的解决
IDEA 打包MapReduce程序到集群运行的两种方式以及XShell和Xftp过期的解决
|
7月前
|
分布式计算 Java Hadoop
【本地模式】第一个Mapreduce程序-wordcount
【本地模式】第一个Mapreduce程序-wordcount
|
存储 分布式计算 资源调度
提交MapReduce程序至YARN执行
提交MapReduce程序至YARN执行
123 0

推荐镜像

更多