Apache Crunch提供了一套Java API,能够简化编写、测试、运行MapReduce Pipeline程序。Crunch的基本思想是隐藏编写MapReduce程序的细节,基于函数式编程的思想,定义了一套函数式编程接口,因为Java并不支持函数式编程,只能通过回调的方式来实现,虽然写起来代码不够美观简洁,但是编写MapReduce程序的思路是非常清晰的,而且比编写原生的MapReduce程序要容易地多。如果直接使用MapReduce API编写一个复杂的Pipeline程序,可能需要考虑好每个Job的细节(Map和Reduce的实现内容),而使用Crunch变成库来编写,只需要清晰地控制好要实现的业务逻辑处理的操作流程,调用Crunch提供的接口(类似函数操作的算子、如union、join、filter、groupBy、sort等等)。
下面,我们简单说明一下Crunch提供的一些功能或内容:
我们看一下Crunch提供的用来在处理分布式数据集的集合类型的抽象定义,如下面类图所示:
上面,我给出了集合类对应的方法签名,其中具有相同名称签名的方法还具有重载的其他方法签名(参数列表不同),Crunch集合类型的高层抽象就包含这3个接口,相关集合子类的实现可以参考Crunch源码。
上面类图中,PTable接口中有个一个join方法,这个默认是实现内连接功能(INNER JOIN),Crunch还提供了一个实现各种join操作的工具类,这个类中包含了join相关的静态方法,如下所示:
1 |
public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right) |
2 |
public static <K, U, V> PTable<K, Pair<U, V>> innerJoin(PTable<K, U> left, PTable<K, V> right) |
3 |
public static <K, U, V> PTable<K, Pair<U, V>> leftJoin(PTable<K, U> left, PTable<K, V> right) |
4 |
public static <K, U, V> PTable<K, Pair<U, V>> rightJoin(PTable<K, U> left, PTable<K, V> right) |
5 |
public static <K, U, V> PTable<K, Pair<U, V>> fullJoin(PTable<K, U> left, PTable<K, V> right) |
排序操作是通过Sort工具来提供的,提供了包含sort相关的静态方法,如下所示:
01 |
public static <T> PCollection<T> sort(PCollection<T> collection) |
02 |
public static <T> PCollection<T> sort(PCollection<T> collection, Order order) |
03 |
public static <T> PCollection<T> sort(PCollection<T> collection, int numReducers, Order order) |
04 |
public static <K, V> PTable<K, V> sort(PTable<K, V> table) |
05 |
public static <K, V> PTable<K, V> sort(PTable<K, V> table, Order key) |
06 |
public static <K, V> PTable<K, V> sort(PTable<K, V> table, int numReducers, Order key) |
07 |
public static <U, V> PCollection<Pair<U, V>> sortPairs(PCollection<Pair<U, V>> collection, ColumnOrder... columnOrders) |
08 |
public static <V1, V2, V3> PCollection<Tuple3<V1, V2, V3>> sortTriples(PCollection<Tuple3<V1, V2, V3>> collection, ColumnOrder... columnOrders) |
09 |
public static <V1, V2, V3, V4> PCollection<Tuple4<V1, V2, V3, V4>> sortQuads(PCollection<Tuple4<V1, V2, V3, V4>> collection, ColumnOrder... columnOrders) |
10 |
public static <V1, V2, V3, V4> PCollection<Tuple4<V1, V2, V3, V4>> sortQuads(Collection<Tuple4<V1, V2, V3, V4>> collection, ColumnOrder... columnOrders) |
11 |
public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection, ColumnOrder... columnOrders) |
12 |
public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection, int numReducers, ColumnOrder... columnOrders) |
可见排序的操作是非常方便的,后面我们会有个例子使用排序的功能。
次级排序是通过SecondarySort工具类来实现的,也是包含了一组静态方法,如下所示:
1 |
public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype) |
2 |
public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype, int numReducers) |
3 |
public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype) |
4 |
public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype, int numReducers) |
工具类Distinct提供的静态方法,如下所示:
1 |
public static <S> PCollection<S> distinct(PCollection<S> input) |
2 |
public static <K, V> PTable<K, V> distinct(PTable<K, V> input) |
3 |
public static <S> PCollection<S> distinct(PCollection<S> input, int flushEvery) |
4 |
public static <K, V> PTable<K, V> distinct(PTable<K, V> input, int flushEvery) |
工具类Sample提供了一组静态方法,如下所示:
01 |
public static <S> PCollection<S> sample(PCollection<S> input, double probability) |
02 |
public static <S> PCollection<S> sample(PCollection<S> input, Long seed, double probability) |
03 |
public static <K, V> PTable<K, V> sample(PTable<K, V> input, double probability) |
04 |
public static <K, V> PTable<K, V> sample(PTable<K, V> input, Long seed, double probability) |
05 |
public static <T> PCollection<T> reservoirSample(PCollection<T> input, int sampleSize) |
06 |
public static <T> PCollection<T> reservoirSample(PCollection<T> input, int sampleSize, Long seed) |
07 |
public static <T, N extends Number> PCollection<T> weightedReservoirSample(PCollection<Pair<T, N>> input, int sampleSize) |
08 |
public static <T, N extends Number> PCollection<T> weightedReservoirSample(PCollection<Pair<T, N>> input, int sampleSize, Long seed) |
09 |
public static <T, N extends Number> PCollection<Pair<Integer, T>> groupedWeightedReservoirSample( PTable<Integer, Pair<T, N>> input, int [] sampleSizes) |
10 |
public static <T, N extends Number> PCollection<Pair<Integer, T>> groupedWeightedReservoirSample(PTable<Integer, Pair<T, N>> input, int [] sampleSizes, Long seed) |
通过Mapreduce和Mapred两个工具类,可以运行我们已经存在的MapReduce程序,静态方法如下所示:
02 |
public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> map( |
04 |
Class<? extends Mapper<K1, V1, K2, V2>> mapperClass, |
05 |
Class<K2> keyClass, Class<V2> valueClass) |
06 |
public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> reduce( |
07 |
PGroupedTable<K1, V1> input, |
08 |
Class<? extends Reducer<K1, V1, K2, V2>> reducerClass, |
09 |
Class<K2> keyClass, Class<V2> valueClass) |
12 |
public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> map( |
14 |
Class<? extends Mapper<K1, V1, K2, V2>> mapperClass, |
15 |
Class<K2> keyClass, Class<V2> valueClass) |
16 |
public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> reduce( |
17 |
PGroupedTable<K1, V1> input, |
18 |
Class<? extends Reducer<K1, V1, K2, V2>> reducerClass, |
19 |
Class<K2> keyClass, Class<V2> valueClass) |
下面,我们通过实现两个例子,来体验使用Crunch和原生MapReduce API编程的不同。
WordCount计算词频程序
我们要实现的WordCount程序,基于Crunch编程库,按照操作来定义分为如下步骤:
- 从HDFS上读取一个保存Text文件的目录
- 将文件中每行内容根据空格分隔,分成<word, 单个单词词频>对(MapReduce程序的Map阶段)
- 将得到的集合按照key分组
- 化简结果得到每个单词的频率计数<word, 全局词频>(MapReduce程序的Reduce阶段)
- 根据单词全局词频计数,得到降序排序的结果集
- 输出结果到HDFS中
基于上述步骤,根据Crunch编程库实现代码,如下所示:
001 |
package org.shirdrn.crunch.examples; |
003 |
import static org.apache.crunch.types.writable.Writables.ints; |
004 |
import static org.apache.crunch.types.writable.Writables.strings; |
005 |
import static org.apache.crunch.types.writable.Writables.tableOf; |
007 |
import java.io.Serializable; |
008 |
import java.util.Iterator; |
010 |
import org.apache.crunch.CombineFn; |
011 |
import org.apache.crunch.DoFn; |
012 |
import org.apache.crunch.Emitter; |
013 |
import org.apache.crunch.PCollection; |
014 |
import org.apache.crunch.PGroupedTable; |
015 |
import org.apache.crunch.PTable; |
016 |
import org.apache.crunch.Pair; |
017 |
import org.apache.crunch.Pipeline; |
018 |
import org.apache.crunch.PipelineResult; |
019 |
import org.apache.crunch.impl.mr.MRPipeline; |
020 |
import org.apache.crunch.lib.Sort; |
021 |
import org.apache.crunch.lib.Sort.ColumnOrder; |
022 |
import org.apache.hadoop.conf.Configuration; |
023 |
import org.apache.hadoop.conf.Configured; |
024 |
import org.apache.hadoop.util.Tool; |
025 |
import org.apache.hadoop.util.ToolRunner; |
027 |
import com.google.common.base.Strings; |
029 |
public class WordCount extends Configured implements Tool, Serializable { |
031 |
private static final long serialVersionUID = 1L; |
034 |
public int run(String[] args) throws Exception { |
035 |
if (args.length != 2 ) { |
036 |
System.err.println( "Usage: hadoop jar crunch-0.0.1-SNAPSHOT" + |
037 |
WordCount. class .getName() + " <input> <output>" ); |
041 |
String inputPath = args[ 0 ]; |
042 |
String outputPath = args[ 1 ]; |
045 |
Pipeline pipeline = new MRPipeline(WordCount. class , getConf()); |
046 |
PCollection<String> lines = pipeline.readTextFile(inputPath); |
049 |
PTable<String, Integer> mappedWords = map(lines); |
052 |
PGroupedTable<String, Integer> groupedWords = mappedWords.groupByKey(); |
055 |
PTable<String, Integer> reducedWords = reduce(groupedWords); |
058 |
PCollection<Pair<String, Integer>> sortedValues = |
059 |
Sort.sortPairs(reducedWords, ColumnOrder.by( 2 , Sort.Order.DESCENDING)); |
062 |
pipeline.writeTextFile(sortedValues, outputPath); |
065 |
PipelineResult result = pipeline.done(); |
066 |
return result.succeeded() ? 0 : 1 ; |
069 |
private final PTable<String, Integer> map(PCollection<String> lines) { |
070 |
PTable<String, Integer> mappedWords = lines.parallelDo( new DoFn<String, Pair<String, Integer>>() { |
071 |
private static final long serialVersionUID = 1L; |
072 |
private static final String PATTERN = "\\s+" ; |
074 |
public void process(String input, Emitter<Pair<String, Integer>> emitter) { |
075 |
if (!Strings.isNullOrEmpty(input)) { |
076 |
for (String word : input.split(PATTERN)) { |
077 |
if (!Strings.isNullOrEmpty(word)) { |
078 |
emitter.emit(Pair.of(word, 1 )); |
083 |
}, tableOf(strings(), ints())); |
087 |
private final PTable<String, Integer> reduce(PGroupedTable<String, Integer> groupedWords) { |
088 |
PTable<String, Integer> reducedWords = groupedWords.combineValues( new CombineFn<String, Integer>() { |
089 |
private static final long serialVersionUID = 1L; |
091 |
public void process(Pair<String, Iterable<Integer>> values, Emitter<Pair<String, Integer>> emitter) { |
093 |
Iterator<Integer> iter = values.second().iterator(); |
094 |
while (iter.hasNext()) { |
095 |
count += iter.next(); |
097 |
emitter.emit(Pair.of(values.first(), count)); |
103 |
public static void main(String[] args) throws Exception { |
104 |
ToolRunner.run( new Configuration(), new WordCount(), args); |
上述代码中,可以在run方法中看到在完成一个计算任务的过程中,有序步骤序列中的每一步需要做什么都非常明确,而如果使用MapReduce API编写,我们可能会完全陷入API的使用方法中,而对整体执行流程没有一个更加直观的印象,尤其是对接触MapReduce不久的开发人员来说,更是难以理解。
运行程序和使用MapReduce API编写的程序使用相同的命令格式,下面运行我们基于Crunch编写的程序,执行如下命令:
1 |
hadoop jar crunch-0.0.1-SNAPSHOT.jar org.shirdrn.crunch.examples.WordCount /data/crunch/ in /data/crunch/out |
控制台运行结果信息,示例如下所示:
01 |
15/03/06 15:48:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable |
02 |
15/03/06 15:48:45 INFO impl.FileTargetImpl: Will write output files to new path: /data/crunch/out |
03 |
15/03/06 15:48:45 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address |
04 |
15/03/06 15:48:46 INFO collect.PGroupedTableImpl: Setting num reduce tasks to 2 |
05 |
15/03/06 15:48:46 INFO client.RMProxy: Connecting to ResourceManager at h1/192.168.4.142:8032 |
06 |
15/03/06 15:48:47 INFO Configuration.deprecation: dfs.block.size is deprecated. Instead, use dfs.blocksize |
07 |
15/03/06 15:48:47 INFO input.FileInputFormat: Total input paths to process : 1 |
08 |
15/03/06 15:48:47 INFO input.CombineFileInputFormat: DEBUG: Terminated node allocation with : CompletedNodes: 2, size left: 51480009 |
09 |
15/03/06 15:48:47 INFO mapreduce.JobSubmitter: number of splits:27 |
10 |
15/03/06 15:48:47 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1422867923292_0012 |
11 |
15/03/06 15:48:48 INFO impl.YarnClientImpl: Submitted application application_1422867923292_0012 |
13 |
15/03/06 15:48:48 INFO jobcontrol.CrunchControlledJob: Running job "org.shirdrn.crunch.examples.WordCount: Text(/data/crunch/in)+S0+GBK+combine+S1+SeqFile(/tmp/crun... ID=1 (1/2)" |
15 |
15/03/06 15:54:57 INFO client.RMProxy: Connecting to ResourceManager at h1/192.168.4.142:8032 |
16 |
15/03/06 15:54:58 INFO input.FileInputFormat: Total input paths to process : 2 |
17 |
15/03/06 15:54:58 INFO mapreduce.JobSubmitter: number of splits:24 |
18 |
15/03/06 15:54:58 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1422867923292_0013 |
19 |
15/03/06 15:54:58 INFO impl.YarnClientImpl: Submitted application application_1422867923292_0013 |
21 |
15/03/06 15:54:58 INFO jobcontrol.CrunchControlledJob: Running job "org.shirdrn.crunch.examples.WordCount: SeqFile(/tmp/crunch-1592377036/p1)+GBK+ungroup+PTables.va... ID=2 (2/2)" |
上面,我们基于Crunch编程库实现的WordCount程序,最终在Hadoop集群上运行时,被转换成2个Job(application_1422867923292_0012和application_1422867923292_0013),也可以通过YARN Application页面,如图所示:
运行成功后,可以通过命令查看程序执行结果,内容如下所示:
01 |
[hadoop@h1 ~]$ hdfs dfs -cat /data/crunch/out/part-r-00000 | head |
02 |
15/03/06 16:24:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable |
可见,程序运行结果符合我们设计程序的预期。
表join操作程序
使用MapReduce API编写join程序时,非常复杂,尤其是如果对MapReduce执行的原理理解不是很深刻时,实现程序可能会有一点困难,而且完成后可能也不是很直观。下面,我们使用Crunch API来实现一个表join的例子,两个表做连接,首先准备的数据文件如下所示:
用户信息表对应的文件为user_info.txt,这是来自手机应用程序的,包含两个字段,第一个是用户编号,第二个是做推广的渠道来源,中间使用TAB键分隔,文件行内容,示例如下所示:
01 |
86756ed5cccd69494f6b94fc31638835 c9049310 |
02 |
c4b0657cf93fc4f05c87a76077e50090 c0000341 |
03 |
d5c33ce0a110ca3e2def912506bcd846 c9049310 |
04 |
7ed745aa273b67b7e9189256140c7220 c0087001 |
05 |
d31908a762c49c68826f9f5addb345cb c0000106 |
06 |
dd3ced7cf15ba397f84eb3389b6ffc11 c0049146 |
07 |
c60bc23264e61e3ba553f6127138c77a c6201204 |
08 |
a892a2001947d872435edddad44da7d7 c9049310 |
09 |
d31908a762c49c68826f9f5addb345cb c9001500 |
10 |
dd3ced7cf15ba397f84eb3389b6ffc11 c6820012 |
11 |
c60bc23264e61e3ba553f6127138c77a c0055102 |
12 |
a892a2001947d872435edddad44da7d7 c9049310 |
13 |
847383774f24b1904a9ada6af7028f52 c0049146 |
14 |
443f9f5c9f8c16a8a676bb567451a65f c6820012 |
15 |
a6b88a8ad016af214ba30aef03b450eb c0055102 |
16 |
f57cab9d9712e9884a5f4a373af8b220 c0055102 |
17 |
3762865e195e7885e41fe748671657c2 c0049146 |
18 |
34323bfbfe3ea8561952769af0bc117f c9000541 |
19 |
9b4e112db554d44ae5c8140a40f7d389 c0049146 |
20 |
f26fd561ab357cf2aad041eaa17c30b4 c0055102 |
21 |
2812d7c1641580784b19f7ef1b1fdbf2 c6820012 |
22 |
7ed745aa273b67b7e9189256140c7220 c9049310 |
23 |
19bacba14597d2e8994ce50a5d47c2b4 c0055102 |
24 |
B9BEDD6A957377FDEA2A78D4B614531A c9000201 |
25 |
7aee01193033491f12342a7b9dbbe4a3 c0000106 |
26 |
77b80cfc95b04ba427412ae5fc276d6f c0055102 |
27 |
e1f7c264271e1ee2e2a78d0798354d89 c6820012 |
28 |
68aeafdfd07e85b41faafb33fc63600c c0055102 |
29 |
556BB4023B23D4C8DD70E81FDD270121 c0049146 |
30 |
d31908a762c49c68826f9f5addb345cb c0055102 |
31 |
14b769541197dc391767ddbce878393b c0055102 |
32 |
c60bc23264e61e3ba553f6127138c77a c0000106 |
33 |
2708bb4bb8f967c607bc82749d892ebf c0055102 |
34 |
70F9442DAE5539ED99E280F12AFA4EC1 c9049310 |
渠道信息表对应的文件为channel_info.txt,文件行内容包含两个字段,第一个是渠道编号,第二个是渠道名称,中间使用TAB键分隔,示例文件行内容如下所示:
由于用户表只保存了渠道编号,在出报表的时候需要将渠道名称显示出来,我们需要将两个表基于渠道编号做一个连接,就能在报表中展示用户所在的渠道名称。
使用Crunch实现这个需求时,非常简单明了,代码如下所示:
01 |
package org.shirdrn.crunch.examples; |
03 |
import java.io.Serializable; |
05 |
import org.apache.crunch.DoFn; |
06 |
import org.apache.crunch.Emitter; |
07 |
import org.apache.crunch.PCollection; |
08 |
import org.apache.crunch.PTable; |
09 |
import org.apache.crunch.Pair; |
10 |
import org.apache.crunch.Pipeline; |
11 |
import org.apache.crunch.PipelineResult; |
12 |
import org.apache.crunch.impl.mr.MRPipeline; |
13 |
import org.apache.crunch.lib.Join; |
14 |
import org.apache.crunch.types.PTypeFamily; |
15 |
import org.apache.hadoop.conf.Configuration; |
16 |
import org.apache.hadoop.conf.Configured; |
17 |
import org.apache.hadoop.util.Tool; |
18 |
import org.apache.hadoop.util.ToolRunner; |
20 |
public class JoinUserChannel extends Configured implements Tool, Serializable { |
22 |
private static final long serialVersionUID = 1L; |
25 |
public int run(String[] args) throws Exception { |
26 |
if (args.length != 3 ) { |
27 |
System.err.println( "Usage: hadoop jar crunch-0.0.1-SNAPSHOT" + |
28 |
JoinUserChannel. class .getName() + " <user_input> <channel_input> <output>" ); |
32 |
String userPath = args[ 0 ]; |
33 |
String channelPath = args[ 1 ]; |
34 |
String outputPath = args[ 2 ]; |
37 |
Pipeline pipeline = new MRPipeline(JoinUserChannel. class , getConf()); |
40 |
PCollection<String> users = pipeline.readTextFile(userPath); |
41 |
PTypeFamily uTF = users.getTypeFamily(); |
42 |
PTable<String, String> left = users.parallelDo( new DoFn<String, Pair<String, String>>() { |
43 |
private static final long serialVersionUID = 1L; |
45 |
public void process(String input, Emitter<Pair<String, String>> emitter) { |
46 |
String[] kv = input.split( "\\s+" ); |
48 |
String userId = kv[ 0 ]; |
49 |
String channelId = kv[ 1 ].trim(); |
50 |
emitter.emit(Pair.of(channelId, userId)); |
54 |
}, uTF.tableOf(uTF.strings(), uTF.strings())); |
57 |
PCollection<String> channels = pipeline.readTextFile(channelPath); |
58 |
PTypeFamily cTF = channels.getTypeFamily(); |
59 |
PTable<String, String> right = channels.parallelDo( new DoFn<String, Pair<String, String>>() { |
60 |
private static final long serialVersionUID = 1L; |
62 |
public void process(String input, Emitter<Pair<String, String>> emitter) { |
63 |
String[] kv = input.split( "\\s+" ); |
65 |
String channelId = kv[ 0 ].trim(); |
66 |
String channelName = kv[ 1 ]; |
67 |
emitter.emit(Pair.of(channelId, channelName)); |
71 |
}, cTF.tableOf(cTF.strings(), cTF.strings())); |
74 |
PTable<String, Pair<String, String>> joinedResult = Join.innerJoin(left, right); |
75 |
pipeline.writeTextFile(joinedResult, outputPath); |
78 |
PipelineResult result = pipeline.done(); |
79 |
return result.succeeded() ? 0 : 1 ; |
82 |
public static void main(String[] args) throws Exception { |
83 |
ToolRunner.run( new Configuration(), new JoinUserChannel(), args); |
首先分别将两个文件的数据读取到PCollection<String>集合对象中,返回PTable<String, String>集合对象,然后通过Join工具类调用innerJoin方法实现内连接操作,返回PTable<String, Pair<String, String>>,最后写入HDFS文件系统中。进行表join操作时,如果是内连接,也可以直接调用PTable<String, String>集合的join方法,与调用Join工具类的innerJoin和join方法功能相同。
运行上述程序,执行如下命令:
1 |
hadoop jar crunch-0.0.1-SNAPSHOT.jar org.shirdrn.crunch.examples.JoinUserChannel /data/crunch/user/user_info.txt /data/crunch/user/channel_info.txt /data/crunch/user/joined |
查看运行结果,如下所示:
01 |
[hadoop@h1 crunch]$ hdfs dfs -ls /data/crunch/user/joined |
02 |
15/03/06 18:53:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable |
04 |
-rw-r--r-- 3 hadoop supergroup 0 2015-03-06 18:53 /data/crunch/user/joined/_SUCCESS |
05 |
-rw-r--r-- 3 hadoop supergroup 1777 2015-03-06 18:53 /data/crunch/user/joined/part-r-00000 |
06 |
[hadoop@h1 crunch]$ hdfs dfs -cat /data/crunch/user/joined/part-r-00000 | head |
07 |
15/03/06 18:54:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable |
08 |
[c0000106,[7aee01193033491f12342a7b9dbbe4a3,bjrow]] |
09 |
[c0000106,[c60bc23264e61e3ba553f6127138c77a,bjrow]] |
10 |
[c0000106,[d31908a762c49c68826f9f5addb345cb,bjrow]] |
11 |
[c0000341,[c4b0657cf93fc4f05c87a76077e50090,Anet]] |
12 |
[c0049146,[3762865e195e7885e41fe748671657c2,pwest]] |
13 |
[c0049146,[556BB4023B23D4C8DD70E81FDD270121,pwest]] |
14 |
[c0049146,[9b4e112db554d44ae5c8140a40f7d389,pwest]] |
15 |
[c0049146,[dd3ced7cf15ba397f84eb3389b6ffc11,pwest]] |
16 |
[c0049146,[847383774f24b1904a9ada6af7028f52,pwest]] |
17 |
[c0055102,[f26fd561ab357cf2aad041eaa17c30b4,susu]] |
总结说明
上面的两个例子只是简单使用Crunch开发了两MapReduce程序,无论对有经验的开发人员还是新手,都很容易上手开发。
目前,Crunch还处在项目初期,最新版本是0.11,当前只在Hadoop的2.2.0以及以下几个平台下测试过,如果在高于2.2.0的版本的平台上运行,可能会有若干一些小问题或不完善之处。上面我们开发的例子程序,就是运行在Hadoop 2.6.0平台上,比如在读取提交程序的节点上的Hadoop配置文件中block size配置时,不支持高版本的类似64m、2g等的配置内容,而必须使用数字类型表示的字节数配置。
Crunch后续开发迭代中一定会使各项功能更加完善,API也更加简洁,尤其是在程序运行调优方面,Crunch的目标是做到几乎不需要进行复杂的调优配置就能够使程序高效地运行,非常期待。
另外,Crunch也提供了对Spark计算平台的支持,想要了解与Spark计算相关的内容,可以参考官网文档。