Apache Crunch提供了一套Java API,能够简化编写、测试、运行MapReduce Pipeline程序。Crunch的基本思想是隐藏编写MapReduce程序的细节,基于函数式编程的思想,定义了一套函数式编程接口,因为Java并不支持函数式编程,只能通过回调的方式来实现,虽然写起来代码不够美观简洁,但是编写MapReduce程序的思路是非常清晰的,而且比编写原生的MapReduce程序要容易地多。如果直接使用MapReduce API编写一个复杂的Pipeline程序,可能需要考虑好每个Job的细节(Map和Reduce的实现内容),而使用Crunch变成库来编写,只需要清晰地控制好要实现的业务逻辑处理的操作流程,调用Crunch提供的接口(类似函数操作的算子、如union、join、filter、groupBy、sort等等)。
上面类图中,PTable接口中有个一个join方法,这个默认是实现内连接功能(INNER JOIN),Crunch还提供了一个实现各种join操作的工具类,这个类中包含了join相关的静态方法,如下所示:
1 |
public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right) |
2 |
public static <K, U, V> PTable<K, Pair<U, V>> innerJoin(PTable<K, U> left, PTable<K, V> right) |
3 |
public static <K, U, V> PTable<K, Pair<U, V>> leftJoin(PTable<K, U> left, PTable<K, V> right) |
4 |
public static <K, U, V> PTable<K, Pair<U, V>> rightJoin(PTable<K, U> left, PTable<K, V> right) |
5 |
public static <K, U, V> PTable<K, Pair<U, V>> fullJoin(PTable<K, U> left, PTable<K, V> right) |
01 |
public static <T> PCollection<T> sort(PCollection<T> collection) |
02 |
public static <T> PCollection<T> sort(PCollection<T> collection, Order order) |
03 |
public static <T> PCollection<T> sort(PCollection<T> collection, int numReducers, Order order) |
04 |
public static <K, V> PTable<K, V> sort(PTable<K, V> table) |
05 |
public static <K, V> PTable<K, V> sort(PTable<K, V> table, Order key) |
06 |
public static <K, V> PTable<K, V> sort(PTable<K, V> table, int numReducers, Order key) |
07 |
public static <U, V> PCollection<Pair<U, V>> sortPairs(PCollection<Pair<U, V>> collection, ColumnOrder... columnOrders) |
08 |
public static <V1, V2, V3> PCollection<Tuple3<V1, V2, V3>> sortTriples(PCollection<Tuple3<V1, V2, V3>> collection, ColumnOrder... columnOrders) |
09 |
public static <V1, V2, V3, V4> PCollection<Tuple4<V1, V2, V3, V4>> sortQuads(PCollection<Tuple4<V1, V2, V3, V4>> collection, ColumnOrder... columnOrders) |
10 |
public static <V1, V2, V3, V4> PCollection<Tuple4<V1, V2, V3, V4>> sortQuads(Collection<Tuple4<V1, V2, V3, V4>> collection, ColumnOrder... columnOrders) |
11 |
public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection, ColumnOrder... columnOrders) |
12 |
public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection, int numReducers, ColumnOrder... columnOrders) |
1 |
public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype) |
2 |
public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype, int numReducers) |
3 |
public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype) |
4 |
public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype, int numReducers) |
1 |
public static <S> PCollection<S> distinct(PCollection<S> input) |
2 |
public static <K, V> PTable<K, V> distinct(PTable<K, V> input) |
3 |
public static <S> PCollection<S> distinct(PCollection<S> input, int flushEvery) |
4 |
public static <K, V> PTable<K, V> distinct(PTable<K, V> input, int flushEvery) |
01 |
public static <S> PCollection<S> sample(PCollection<S> input, double probability) |
02 |
public static <S> PCollection<S> sample(PCollection<S> input, Long seed, double probability) |
03 |
public static <K, V> PTable<K, V> sample(PTable<K, V> input, double probability) |
04 |
public static <K, V> PTable<K, V> sample(PTable<K, V> input, Long seed, double probability) |
05 |
public static <T> PCollection<T> reservoirSample(PCollection<T> input, int sampleSize) |
06 |
public static <T> PCollection<T> reservoirSample(PCollection<T> input, int sampleSize, Long seed) |
07 |
public static <T, N extends Number> PCollection<T> weightedReservoirSample(PCollection<Pair<T, N>> input, int sampleSize) |
08 |
public static <T, N extends Number> PCollection<T> weightedReservoirSample(PCollection<Pair<T, N>> input, int sampleSize, Long seed) |
09 |
public static <T, N extends Number> PCollection<Pair<Integer, T>> groupedWeightedReservoirSample( PTable<Integer, Pair<T, N>> input, int [] sampleSizes) |
10 |
public static <T, N extends Number> PCollection<Pair<Integer, T>> groupedWeightedReservoirSample(PTable<Integer, Pair<T, N>> input, int [] sampleSizes, Long seed) |
02 |
public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> map( |
04 |
Class<? extends Mapper<K1, V1, K2, V2>> mapperClass, |
05 |
Class<K2> keyClass, Class<V2> valueClass) |
06 |
public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> reduce( |
07 |
PGroupedTable<K1, V1> input, |
08 |
Class<? extends Reducer<K1, V1, K2, V2>> reducerClass, |
09 |
Class<K2> keyClass, Class<V2> valueClass) |
12 |
public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> map( |
14 |
Class<? extends Mapper<K1, V1, K2, V2>> mapperClass, |
15 |
Class<K2> keyClass, Class<V2> valueClass) |
16 |
public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> reduce( |
17 |
PGroupedTable<K1, V1> input, |
18 |
Class<? extends Reducer<K1, V1, K2, V2>> reducerClass, |
19 |
Class<K2> keyClass, Class<V2> valueClass) |
下面,我们通过实现两个例子,来体验使用Crunch和原生MapReduce API编程的不同。
- 从HDFS上读取一个保存Text文件的目录
- 将文件中每行内容根据空格分隔,分成<word, 单个单词词频>对(MapReduce程序的Map阶段)
- 将得到的集合按照key分组
- 化简结果得到每个单词的频率计数<word, 全局词频>(MapReduce程序的Reduce阶段)
- 根据单词全局词频计数,得到降序排序的结果集
- 输出结果到HDFS中
001 |
package org.shirdrn.crunch.examples; |
003 |
import static org.apache.crunch.types.writable.Writables.ints; |
004 |
import static org.apache.crunch.types.writable.Writables.strings; |
005 |
import static org.apache.crunch.types.writable.Writables.tableOf; |
007 |
import java.io.Serializable; |
008 |
import java.util.Iterator; |
010 |
import org.apache.crunch.CombineFn; |
011 |
import org.apache.crunch.DoFn; |
012 |
import org.apache.crunch.Emitter; |
013 |
import org.apache.crunch.PCollection; |
014 |
import org.apache.crunch.PGroupedTable; |
015 |
import org.apache.crunch.PTable; |
016 |
import org.apache.crunch.Pair; |
017 |
import org.apache.crunch.Pipeline; |
018 |
import org.apache.crunch.PipelineResult; |
019 |
import org.apache.crunch.impl.mr.MRPipeline; |
020 |
import org.apache.crunch.lib.Sort; |
021 |
import org.apache.crunch.lib.Sort.ColumnOrder; |
022 |
import org.apache.hadoop.conf.Configuration; |
023 |
import org.apache.hadoop.conf.Configured; |
024 |
import org.apache.hadoop.util.Tool; |
025 |
import org.apache.hadoop.util.ToolRunner; |
027 |
import com.google.common.base.Strings; |
029 |
public class WordCount extends Configured implements Tool, Serializable { |
031 |
private static final long serialVersionUID = 1L; |
034 |
public int run(String[] args) throws Exception { |
035 |
if (args.length != 2 ) { |
036 |
System.err.println( "Usage: hadoop jar crunch-0.0.1-SNAPSHOT" + |
037 |
WordCount. class .getName() + " <input> <output>" ); |
041 |
String inputPath = args[ 0 ]; |
042 |
String outputPath = args[ 1 ]; |
045 |
Pipeline pipeline = new MRPipeline(WordCount. class , getConf()); |
046 |
PCollection<String> lines = pipeline.readTextFile(inputPath); |
049 |
PTable<String, Integer> mappedWords = map(lines); |
052 |
PGroupedTable<String, Integer> groupedWords = mappedWords.groupByKey(); |
055 |
PTable<String, Integer> reducedWords = reduce(groupedWords); |
058 |
PCollection<Pair<String, Integer>> sortedValues = |
059 |
Sort.sortPairs(reducedWords, ColumnOrder.by( 2 , Sort.Order.DESCENDING)); |
062 |
pipeline.writeTextFile(sortedValues, outputPath); |
065 |
PipelineResult result = pipeline.done(); |
066 |
return result.succeeded() ? 0 : 1 ; |
069 |
private final PTable<String, Integer> map(PCollection<String> lines) { |
070 |
PTable<String, Integer> mappedWords = lines.parallelDo( new DoFn<String, Pair<String, Integer>>() { |
071 |
private static final long serialVersionUID = 1L; |
072 |
private static final String PATTERN = "\\s+" ; |
074 |
public void process(String input, Emitter<Pair<String, Integer>> emitter) { |
075 |
if (!Strings.isNullOrEmpty(input)) { |
076 |
for (String word : input.split(PATTERN)) { |
077 |
if (!Strings.isNullOrEmpty(word)) { |
078 |
emitter.emit(Pair.of(word, 1 )); |
083 |
}, tableOf(strings(), ints())); |
087 |
private final PTable<String, Integer> reduce(PGroupedTable<String, Integer> groupedWords) { |
088 |
PTable<String, Integer> reducedWords = groupedWords.combineValues( new CombineFn<String, Integer>() { |
089 |
private static final long serialVersionUID = 1L; |
091 |
public void process(Pair<String, Iterable<Integer>> values, Emitter<Pair<String, Integer>> emitter) { |
093 |
Iterator<Integer> iter = values.second().iterator(); |
094 |
while (iter.hasNext()) { |
095 |
count += iter.next(); |
097 |
emitter.emit(Pair.of(values.first(), count)); |
103 |
public static void main(String[] args) throws Exception { |
104 |
ToolRunner.run( new Configuration(), new WordCount(), args); |
上述代码中,可以在run方法中看到在完成一个计算任务的过程中,有序步骤序列中的每一步需要做什么都非常明确,而如果使用MapReduce API编写,我们可能会完全陷入API的使用方法中,而对整体执行流程没有一个更加直观的印象,尤其是对接触MapReduce不久的开发人员来说,更是难以理解。
运行程序和使用MapReduce API编写的程序使用相同的命令格式,下面运行我们基于Crunch编写的程序,执行如下命令:
1 |
hadoop jar crunch-0.0.1-SNAPSHOT.jar org.shirdrn.crunch.examples.WordCount /data/crunch/ in /data/crunch/out |
01 |
15/03/06 15:48:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable |
02 |
15/03/06 15:48:45 INFO impl.FileTargetImpl: Will write output files to new path: /data/crunch/out |
03 |
15/03/06 15:48:45 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address |
04 |
15/03/06 15:48:46 INFO collect.PGroupedTableImpl: Setting num reduce tasks to 2 |
05 |
15/03/06 15:48:46 INFO client.RMProxy: Connecting to ResourceManager at h1/ |
06 |
15/03/06 15:48:47 INFO Configuration.deprecation: dfs.block.size is deprecated. Instead, use dfs.blocksize |
07 |
15/03/06 15:48:47 INFO input.FileInputFormat: Total input paths to process : 1 |
08 |
15/03/06 15:48:47 INFO input.CombineFileInputFormat: DEBUG: Terminated node allocation with : CompletedNodes: 2, size left: 51480009 |
09 |
15/03/06 15:48:47 INFO mapreduce.JobSubmitter: number of splits:27 |
10 |
15/03/06 15:48:47 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1422867923292_0012 |
11 |
15/03/06 15:48:48 INFO impl.YarnClientImpl: Submitted application application_1422867923292_0012 |
13 |
15/03/06 15:48:48 INFO jobcontrol.CrunchControlledJob: Running job "org.shirdrn.crunch.examples.WordCount: Text(/data/crunch/in)+S0+GBK+combine+S1+SeqFile(/tmp/crun... ID=1 (1/2)" |
15 |
15/03/06 15:54:57 INFO client.RMProxy: Connecting to ResourceManager at h1/ |
16 |
15/03/06 15:54:58 INFO input.FileInputFormat: Total input paths to process : 2 |
17 |
15/03/06 15:54:58 INFO mapreduce.JobSubmitter: number of splits:24 |
18 |
15/03/06 15:54:58 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1422867923292_0013 |
19 |
15/03/06 15:54:58 INFO impl.YarnClientImpl: Submitted application application_1422867923292_0013 |
21 |
15/03/06 15:54:58 INFO jobcontrol.CrunchControlledJob: Running job "org.shirdrn.crunch.examples.WordCount: SeqFile(/tmp/crunch-1592377036/p1)+GBK+ungroup+PTables.va... ID=2 (2/2)" |
上面,我们基于Crunch编程库实现的WordCount程序,最终在Hadoop集群上运行时,被转换成2个Job(application_1422867923292_0012和application_1422867923292_0013),也可以通过YARN Application页面,如图所示:
01 |
[hadoop@h1 ~]$ hdfs dfs -cat /data/crunch/out/part-r-00000 | head |
02 |
15/03/06 16:24:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable |
使用MapReduce API编写join程序时,非常复杂,尤其是如果对MapReduce执行的原理理解不是很深刻时,实现程序可能会有一点困难,而且完成后可能也不是很直观。下面,我们使用Crunch API来实现一个表join的例子,两个表做连接,首先准备的数据文件如下所示:
01 |
86756ed5cccd69494f6b94fc31638835 c9049310 |
02 |
c4b0657cf93fc4f05c87a76077e50090 c0000341 |
03 |
d5c33ce0a110ca3e2def912506bcd846 c9049310 |
04 |
7ed745aa273b67b7e9189256140c7220 c0087001 |
05 |
d31908a762c49c68826f9f5addb345cb c0000106 |
06 |
dd3ced7cf15ba397f84eb3389b6ffc11 c0049146 |
07 |
c60bc23264e61e3ba553f6127138c77a c6201204 |
08 |
a892a2001947d872435edddad44da7d7 c9049310 |
09 |
d31908a762c49c68826f9f5addb345cb c9001500 |
10 |
dd3ced7cf15ba397f84eb3389b6ffc11 c6820012 |
11 |
c60bc23264e61e3ba553f6127138c77a c0055102 |
12 |
a892a2001947d872435edddad44da7d7 c9049310 |
13 |
847383774f24b1904a9ada6af7028f52 c0049146 |
14 |
443f9f5c9f8c16a8a676bb567451a65f c6820012 |
15 |
a6b88a8ad016af214ba30aef03b450eb c0055102 |
16 |
f57cab9d9712e9884a5f4a373af8b220 c0055102 |
17 |
3762865e195e7885e41fe748671657c2 c0049146 |
18 |
34323bfbfe3ea8561952769af0bc117f c9000541 |
19 |
9b4e112db554d44ae5c8140a40f7d389 c0049146 |
20 |
f26fd561ab357cf2aad041eaa17c30b4 c0055102 |
21 |
2812d7c1641580784b19f7ef1b1fdbf2 c6820012 |
22 |
7ed745aa273b67b7e9189256140c7220 c9049310 |
23 |
19bacba14597d2e8994ce50a5d47c2b4 c0055102 |
24 |
B9BEDD6A957377FDEA2A78D4B614531A c9000201 |
25 |
7aee01193033491f12342a7b9dbbe4a3 c0000106 |
26 |
77b80cfc95b04ba427412ae5fc276d6f c0055102 |
27 |
e1f7c264271e1ee2e2a78d0798354d89 c6820012 |
28 |
68aeafdfd07e85b41faafb33fc63600c c0055102 |
29 |
556BB4023B23D4C8DD70E81FDD270121 c0049146 |
30 |
d31908a762c49c68826f9f5addb345cb c0055102 |
31 |
14b769541197dc391767ddbce878393b c0055102 |
32 |
c60bc23264e61e3ba553f6127138c77a c0000106 |
33 |
2708bb4bb8f967c607bc82749d892ebf c0055102 |
34 |
70F9442DAE5539ED99E280F12AFA4EC1 c9049310 |
01 |
package org.shirdrn.crunch.examples; |
03 |
import java.io.Serializable; |
05 |
import org.apache.crunch.DoFn; |
06 |
import org.apache.crunch.Emitter; |
07 |
import org.apache.crunch.PCollection; |
08 |
import org.apache.crunch.PTable; |
09 |
import org.apache.crunch.Pair; |
10 |
import org.apache.crunch.Pipeline; |
11 |
import org.apache.crunch.PipelineResult; |
12 |
import org.apache.crunch.impl.mr.MRPipeline; |
13 |
import org.apache.crunch.lib.Join; |
14 |
import org.apache.crunch.types.PTypeFamily; |
15 |
import org.apache.hadoop.conf.Configuration; |
16 |
import org.apache.hadoop.conf.Configured; |
17 |
import org.apache.hadoop.util.Tool; |
18 |
import org.apache.hadoop.util.ToolRunner; |
20 |
public class JoinUserChannel extends Configured implements Tool, Serializable { |
22 |
private static final long serialVersionUID = 1L; |
25 |
public int run(String[] args) throws Exception { |
26 |
if (args.length != 3 ) { |
27 |
System.err.println( "Usage: hadoop jar crunch-0.0.1-SNAPSHOT" + |
28 |
JoinUserChannel. class .getName() + " <user_input> <channel_input> <output>" ); |
32 |
String userPath = args[ 0 ]; |
33 |
String channelPath = args[ 1 ]; |
34 |
String outputPath = args[ 2 ]; |
37 |
Pipeline pipeline = new MRPipeline(JoinUserChannel. class , getConf()); |
40 |
PCollection<String> users = pipeline.readTextFile(userPath); |
41 |
PTypeFamily uTF = users.getTypeFamily(); |
42 |
PTable<String, String> left = users.parallelDo( new DoFn<String, Pair<String, String>>() { |
43 |
private static final long serialVersionUID = 1L; |
45 |
public void process(String input, Emitter<Pair<String, String>> emitter) { |
46 |
String[] kv = input.split( "\\s+" ); |
48 |
String userId = kv[ 0 ]; |
49 |
String channelId = kv[ 1 ].trim(); |
50 |
emitter.emit(Pair.of(channelId, userId)); |
54 |
}, uTF.tableOf(uTF.strings(), uTF.strings())); |
57 |
PCollection<String> channels = pipeline.readTextFile(channelPath); |
58 |
PTypeFamily cTF = channels.getTypeFamily(); |
59 |
PTable<String, String> right = channels.parallelDo( new DoFn<String, Pair<String, String>>() { |
60 |
private static final long serialVersionUID = 1L; |
62 |
public void process(String input, Emitter<Pair<String, String>> emitter) { |
63 |
String[] kv = input.split( "\\s+" ); |
65 |
String channelId = kv[ 0 ].trim(); |
66 |
String channelName = kv[ 1 ]; |
67 |
emitter.emit(Pair.of(channelId, channelName)); |
71 |
}, cTF.tableOf(cTF.strings(), cTF.strings())); |
74 |
PTable<String, Pair<String, String>> joinedResult = Join.innerJoin(left, right); |
75 |
pipeline.writeTextFile(joinedResult, outputPath); |
78 |
PipelineResult result = pipeline.done(); |
79 |
return result.succeeded() ? 0 : 1 ; |
82 |
public static void main(String[] args) throws Exception { |
83 |
ToolRunner.run( new Configuration(), new JoinUserChannel(), args); |
首先分别将两个文件的数据读取到PCollection<String>集合对象中,返回PTable<String, String>集合对象,然后通过Join工具类调用innerJoin方法实现内连接操作,返回PTable<String, Pair<String, String>>,最后写入HDFS文件系统中。进行表join操作时,如果是内连接,也可以直接调用PTable<String, String>集合的join方法,与调用Join工具类的innerJoin和join方法功能相同。
1 |
hadoop jar crunch-0.0.1-SNAPSHOT.jar org.shirdrn.crunch.examples.JoinUserChannel /data/crunch/user/user_info.txt /data/crunch/user/channel_info.txt /data/crunch/user/joined |
01 |
[hadoop@h1 crunch]$ hdfs dfs -ls /data/crunch/user/joined |
02 |
15/03/06 18:53:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable |
04 |
-rw-r--r-- 3 hadoop supergroup 0 2015-03-06 18:53 /data/crunch/user/joined/_SUCCESS |
05 |
-rw-r--r-- 3 hadoop supergroup 1777 2015-03-06 18:53 /data/crunch/user/joined/part-r-00000 |
06 |
[hadoop@h1 crunch]$ hdfs dfs -cat /data/crunch/user/joined/part-r-00000 | head |
07 |
15/03/06 18:54:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable |
08 |
[c0000106,[7aee01193033491f12342a7b9dbbe4a3,bjrow]] |
09 |
[c0000106,[c60bc23264e61e3ba553f6127138c77a,bjrow]] |
10 |
[c0000106,[d31908a762c49c68826f9f5addb345cb,bjrow]] |
11 |
[c0000341,[c4b0657cf93fc4f05c87a76077e50090,Anet]] |
12 |
[c0049146,[3762865e195e7885e41fe748671657c2,pwest]] |
13 |
[c0049146,[556BB4023B23D4C8DD70E81FDD270121,pwest]] |
14 |
[c0049146,[9b4e112db554d44ae5c8140a40f7d389,pwest]] |
15 |
[c0049146,[dd3ced7cf15ba397f84eb3389b6ffc11,pwest]] |
16 |
[c0049146,[847383774f24b1904a9ada6af7028f52,pwest]] |
17 |
[c0055102,[f26fd561ab357cf2aad041eaa17c30b4,susu]] |
目前,Crunch还处在项目初期,最新版本是0.11,当前只在Hadoop的2.2.0以及以下几个平台下测试过,如果在高于2.2.0的版本的平台上运行,可能会有若干一些小问题或不完善之处。上面我们开发的例子程序,就是运行在Hadoop 2.6.0平台上,比如在读取提交程序的节点上的Hadoop配置文件中block size配置时,不支持高版本的类似64m、2g等的配置内容,而必须使用数字类型表示的字节数配置。