源码见:https://github.com/hiszm/hadoop-train
HDFS项目实战
需求分析
使用HDFS Java API 才完成HDFS文件系统上的额文件的词频统计
例子/test/1.txt
==> ' hello world'
/test/2.txt
==> ' hello world world'
得出 hello
两个, world
三个
代码框架编写
- 1:读取HDFS上的文件
- 2:词频统计
- 3:将处理的结果混存起来 Map
- 4:将结果输出到HDFS
上下文
package com.bigdata.hadoop.hdfs;
import java.util.HashMap;
import java.util.Map;
//自定义上下文,缓存处理
public class Context {
private Map<Object,Object> cacheMap = new HashMap<Object, Object>();
public Map<Object,Object> getCacheMap (){
return cacheMap;
}
/**
* 写数据到缓存中
* @param key 单词
* @param value 次数
*/
public void write(Object key,Object value){
cacheMap.put(key,value);
}
/**
* 从缓存中获取数据
* @param key 单词
* @return 词频
*/
public Object get(Object key){
return cacheMap.get(key);
}
}
处理类实现
package com.bigdata.hadoop.hdfs;
public interface Mapper {
/**
* 自定义上下文
* @param line 读取到每一行数据
* @param context 上下文/缓存
*/
public void map(String line, Context context);
}
package com.bigdata.hadoop.hdfs;
public class WordCount implements Mapper{
@Override
public void map(String line, Context context) {
String[] words = line.split("\t");
for(String word: words){
Object value = context.get(word);
if(value==null){//没有出现该单词
context.write(word,1);
}else{//已经有了,取出value再+1
int v =Integer.parseInt(value.toString());
context.write(word,v+1);
}
}
}
}
功能实现
package com.bigdata.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class HDFSWCApp01 {
public static void main(String[] args) throws Exception{
//1:读取HDFS上的文件
Path input= new Path("/hdfsapi/local.txt");
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop000:8020"),new Configuration(),"hadoop");
//?迭代器
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(input,false);
Mapper mapper =new WordCount();
//创建上下文
Context context= new Context();
while(iterator.hasNext()){
LocatedFileStatus file = iterator.next();
FSDataInputStream in = fs.open(file.getPath());
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line="";
while ((line =reader.readLine())!=null){
//2:词频统计
//将业务逻辑处理完成后再返回給cache中
mapper.map(line,context);
}
reader.close();
in.close();
}
//3:将处理的结果混存起来 Map
Map<Object,Object> contextMap = context.getCacheMap();
//Map<Object,Object> contextMap = new HashMap<Object,Object>();
//4:将结果输出到HDFS
Path output =new Path("/hdfsapi/output/");
FSDataOutputStream out = fs.create(new Path(output,new Path("wc.out")));
Set<Map.Entry<Object,Object>> entries = contextMap.entrySet();
//迭代循环
for(Map.Entry<Object,Object> entry : entries){
out.write((entry.getKey().toString()+ "\t"+entry.getValue()+"\n").getBytes());
}
out.close();
fs.close();
System.out.println("统计完毕");
}
}
/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/bin/java -javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=54308:/Applications/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8 -classpath /Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/lib/tools.jar:/Users/jacksun/IdeaProjects/untitled5/target/classes:/Users/jacksun/.m2/repository/org/apache/hadoop/hadoop-client/2.6.0-cdh5.15.1/hadoop-client-2.6.0-cdh5.15.1.jar:/Users/jacksun/.m2/repository/org/apache/hadoop/hadoop-common/2.6.0-cdh5.15.1/hadoop-common-2.6.0-cdh5.15.1.jar:/Users/jacksun/.m2/repository/com/google/guava/guava/11.0.2/guava-11.0.2.jar:/Users/jacksun/.m2/repository/commons-cli/commons-cli/1.2/commons-cli-1.2.jar:/Users/jacksun/.m2/repository/org/apache/commons/commons-math3/3.1.1/commons-math3-3.1.1.jar:/Users/jacksun/.m2/repository/xmlenc/xmlenc/0.52/xmlenc-0.52.jar:/Users/jacksun/.m2/repository/commons-httpclient/commons-httpclient/3.1/commons-httpclient-3.1.jar:/Users/jacksun/.m2/repository/commons-codec/commons-codec/1.4/commons-codec-1.4.jar:/Users/jacksun/.m2/repository/commons-io/commons-io/2.4/commons-io-2.4.jar:/Users/jacksun/.m2/repository/commons-net/commons-net/3.1/commons-net-3.1.jar:/Users/jacksun/.m2/repository/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar:/Users/jacksun/.m2/repository/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar:/Users/jacksun/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/Users/jacksun/.m2/repository/commons-lang/commons-lang/2.6/commons-lang-2.6.jar:/Users/jacksun/.m2/repository/commons-configuration/commons-configuration/1.6/commons-configuration-1.6.jar:/Users/jacksun/.m2/repository/commons-digester/commons-digester/1.8/commons-digester-1.8.jar:/Users/jacksun/.m2/repository/commons-beanutils/commons-beanutils/1.7.0/commons-beanutils-1.7.0.jar:/Users/jacksun/.m2/repository/commons-beanutils/commons-beanutils-core/1.8.0/commons-beanutils-core-1.8.0.jar:/Users/jacksun/.m2/repository/org/slf4j/slf4j-api/1.7.5/slf4j-api-1.7.5.jar:/Users/jacksun/.m2/repository/org/slf4j/slf4j-log4j12/1.7.5/slf4j-log4j12-1.7.5.jar:/Users/jacksun/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.8.8/jackson-core-asl-1.8.8.jar:/Users/jacksun/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.8.8/jackson-mapper-asl-1.8.8.jar:/Users/jacksun/.m2/repository/org/apache/avro/avro/1.7.6-cdh5.15.1/avro-1.7.6-cdh5.15.1.jar:/Users/jacksun/.m2/repository/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar:/Users/jacksun/.m2/repository/org/xerial/snappy/snappy-java/1.0.4.1/snappy-java-1.0.4.1.jar:/Users/jacksun/.m2/repository/com/google/protobuf/protobuf-java/2.5.0/protobuf-java-2.5.0.jar:/Users/jacksun/.m2/repository/com/google/code/gson/gson/2.2.4/gson-2.2.4.jar:/Users/jacksun/.m2/repository/org/apache/hadoop/hadoop-auth/2.6.0-cdh5.15.1/hadoop-auth-2.6.0-cdh5.15.1.jar:/Users/jacksun/.m2/repository/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar:/Users/jacksun/.m2/repository/org/apache/httpcomponents/httpcore/4.2.4/httpcore-4.2.4.jar:/Users/jacksun/.m2/repository/org/apache/directory/server/apacheds-kerberos-codec/2.0.0-M15/apacheds-kerberos-codec-2.0.0-M15.jar:/Users/jacksun/.m2/repository/org/apache/directory/server/apacheds-i18n/2.0.0-M15/apacheds-i18n-2.0.0-M15.jar:/Users/jacksun/.m2/repository/org/apache/directory/api/api-asn1-api/1.0.0-M20/api-asn1-api-1.0.0-M20.jar:/Users/jacksun/.m2/repository/org/apache/directory/api/api-util/1.0.0-M20/api-util-1.0.0-M20.jar:/Users/jacksun/.m2/repository/org/apache/curator/curator-framework/2.7.1/curator-framework-2.7.1.jar:/Users/jacksun/.m2/repository/org/apache/curator/curator-client/2.7.1/curator-client-2.7.1.jar:/Users/jacksun/.m2/repository/org/apache/curator/curator-recipes/2.7.1/curator-recipes-2.7.1.jar:/Users/jacksun/.m2/repository/com/google/code/findbugs/jsr305/3.0.0/jsr305-3.0.0.jar:/Users/jacksun/.m2/repository/org/apache/htrace/htrace-core4/4.0.1-incubating/htrace-core4-4.0.1-incubating.jar:/Users/jacksun/.m2/repository/org/apache/zookeeper/zookeeper/3.4.5-cdh5.15.1/zookeeper-3.4.5-cdh5.15.1.jar:/Users/jacksun/.m2/repository/org/apache/commons/commons-compress/1.4.1/commons-compress-1.4.1.jar:/Users/jacksun/.m2/repository/org/tukaani/xz/1.0/xz-1.0.jar:/Users/jacksun/.m2/repository/org/apache/hadoop/hadoop-hdfs/2.6.0-cdh5.15.1/hadoop-hdfs-2.6.0-cdh5.15.1.jar:/Users/jacksun/.m2/repository/org/mortbay/jetty/jetty-util/6.1.26.cloudera.4/jetty-util-6.1.26.cloudera.4.jar:/Users/jacksun/.m2/repository/io/netty/netty/3.10.5.Final/netty-3.10.5.Final.jar:/Users/jacksun/.m2/repository/io/netty/netty-all/4.0.23.Final/netty-all-4.0.23.Final.jar:/Users/jacksun/.m2/repository/xerces/xercesImpl/2.9.1/xercesImpl-2.9.1.jar:/Users/jacksun/.m2/repository/xml-apis/xml-apis/1.3.04/xml-apis-1.3.04.jar:/Users/jacksun/.m2/repository/org/fusesource/leveldbjni/leveldbjni-all/1.8/leveldbjni-all-1.8.jar:/Users/jacksun/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-app/2.6.0-cdh5.15.1/hadoop-mapreduce-client-app-2.6.0-cdh5.15.1.jar:/Users/jacksun/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-common/2.6.0-cdh5.15.1/hadoop-mapreduce-client-common-2.6.0-cdh5.15.1.jar:/Users/jacksun/.m2/repository/org/apache/hadoop/hadoop-yarn-client/2.6.0-cdh5.15.1/hadoop-yarn-client-2.6.0-cdh5.15.1.jar:/Users/jacksun/.m2/repository/org/apache/hadoop/hadoop-yarn-server-common/2.6.0-cdh5.15.1/hadoop-yarn-server-common-2.6.0-cdh5.15.1.jar:/Users/jacksun/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-shuffle/2.6.0-cdh5.15.1/hadoop-mapreduce-client-shuffle-2.6.0-cdh5.15.1.jar:/Users/jacksun/.m2/repository/org/apache/hadoop/hadoop-yarn-api/2.6.0-cdh5.15.1/hadoop-yarn-api-2.6.0-cdh5.15.1.jar:/Users/jacksun/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-core/2.6.0-cdh5.15.1/hadoop-mapreduce-client-core-2.6.0-cdh5.15.1.jar:/Users/jacksun/.m2/repository/org/apache/hadoop/hadoop-yarn-common/2.6.0-cdh5.15.1/hadoop-yarn-common-2.6.0-cdh5.15.1.jar:/Users/jacksun/.m2/repository/javax/xml/bind/jaxb-api/2.2.2/jaxb-api-2.2.2.jar:/Users/jacksun/.m2/repository/javax/xml/stream/stax-api/1.0-2/stax-api-1.0-2.jar:/Users/jacksun/.m2/repository/javax/activation/activation/1.1/activation-1.1.jar:/Users/jacksun/.m2/repository/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar:/Users/jacksun/.m2/repository/com/sun/jersey/jersey-core/1.9/jersey-core-1.9.jar:/Users/jacksun/.m2/repository/com/sun/jersey/jersey-client/1.9/jersey-client-1.9.jar:/Users/jacksun/.m2/repository/org/codehaus/jackson/jackson-jaxrs/1.8.8/jackson-jaxrs-1.8.8.jar:/Users/jacksun/.m2/repository/org/codehaus/jackson/jackson-xc/1.8.8/jackson-xc-1.8.8.jar:/Users/jacksun/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-jobclient/2.6.0-cdh5.15.1/hadoop-mapreduce-client-jobclient-2.6.0-cdh5.15.1.jar:/Users/jacksun/.m2/repository/org/apache/hadoop/hadoop-aws/2.6.0-cdh5.15.1/hadoop-aws-2.6.0-cdh5.15.1.jar:/Users/jacksun/.m2/repository/com/amazonaws/aws-java-sdk-bundle/1.11.134/aws-java-sdk-bundle-1.11.134.jar:/Users/jacksun/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.2.3/jackson-core-2.2.3.jar:/Users/jacksun/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.2.3/jackson-databind-2.2.3.jar:/Users/jacksun/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.2.3/jackson-annotations-2.2.3.jar:/Users/jacksun/.m2/repository/org/apache/hadoop/hadoop-annotations/2.6.0-cdh5.15.1/hadoop-annotations-2.6.0-cdh5.15.1.jar com.bigdata.hadoop.hdfs.HDFSWCApp01
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
统计完毕
Process finished with exit code 0
hello world[hadoop@hadoop000 sbin]$ hadoop fs -text /hdfsapi/local.txt
hello world
hello hello
hello
world
[hadoop@hadoop000 sbin]$ hadoop fs -cat /hdfsapi/output/wc.out
1
world 2
hello 4