章节内容
上一节我们完成了:
MapReduce的介绍
Hadoop序列化介绍
Mapper编写规范
Reducer编写规范
Driver编写规范
WordCount功能开发
WordCount本地测试
背景介绍
这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。
之前已经在 VM 虚拟机上搭建过一次,但是没留下笔记,这次趁着前几天薅羊毛的3台机器,赶紧尝试在公网上搭建体验一下。
注意,如果你和我一样,打算用公网部署,那一定要做好防火墙策略,避免不必要的麻烦!!!
请大家都以学习为目的,也请不要对我的服务进行嗅探或者攻击!!!
但是有一台公网服务器我还运行着别的服务,比如前几天发的:autodl-keeper 自己写的小工具,防止AutoDL机器过期的。还跑着别的Web服务,所以只能挤出一台 2C2G 的机器。那我的配置如下了:
2C4G 编号 h121
2C4G 编号 h122
2C2G 编号 h123
业务需求
平常我们在业务上,有很多时候表都是分开的,通过一些 id
或者 code
来进行关联。
在大数据的情况下,也有很多这种情况,我们需要进行联表操作。
表1
项目编码projectCode 项目名projectName • 1
表2
项目编码projectCode 项目类型projectType 项目分类projectFrom
在 SQL
中,可以通过 LEFT JOIN
来实现字段补齐。大数据下,也需要进行这样的操作,我们需要借助 MapReduce
。
表1测试
"8aea9ba2-435c-48bd-9751-1cbd4c344d4e" "社区项目1" "02d9c090-e467-42b6-9c14-52cacd72a4a8" "社区项目2" "244dcaca-0778-4eec-b3a2-403f8fac1dfb" "智慧社区" "94befb97-d1af-43f2-b5d5-6df9ce5b9393" "公交站点" "f44c8d10-bc92-4398-ad9b-8c11dd48ad7c" "街道布建" "2e556d83-bb56-45b1-8d6e-00510902c464" "街道公交站点" "3ba00542-eac9-4399-9c2b-3b06e671f4c9" "未命名项目1" "5a5982d7-7257-422f-822a-a0c2f31c28d1" "未命名项目2"
表2测试
"8aea9ba2-435c-48bd-9751-1cbd4c344d4e" "重要类型" "种类1" "02d9c090-e467-42b6-9c14-52cacd72a4a8" "重要类型" "种类1" "244dcaca-0778-4eec-b3a2-403f8fac1dfb" "重要类型" "种类1" "94befb97-d1af-43f2-b5d5-6df9ce5b9393" "普通类型" "种类1" "f44c8d10-bc92-4398-ad9b-8c11dd48ad7c" "普通类型" "种类2" "2e556d83-bb56-45b1-8d6e-00510902c464" "普通类型" "种类2" "3ba00542-eac9-4399-9c2b-3b06e671f4c9" "一般类型" "种类2" "5a5982d7-7257-422f-822a-a0c2f31c28d1" "一般类型" "种类2"
SQL连表
假设我们使用SQL的方式联表:
SELECT * FROM t_project LEFT JOIN t_project_info ON t_project.projectCode=t_project_info.projectCode
Reduce JOIN
有时候,表可能过大,无法支持我们使用 SQL 进行连表查询。
这里我们编写一个程序来完成操作。
ProjectBean
这里是最终的Bean类,里边是两个表把字段补齐的结果,一会儿我们将使用这个类进行表的连接。
package icu.wzk.demo03; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class ProjectBean implements Writable { private String projectCode; private String projectName; private String projectType; private String projectFrom; private String flag; @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(projectCode); dataOutput.writeUTF(projectName); dataOutput.writeUTF(projectType); dataOutput.writeUTF(projectFrom); dataOutput.writeUTF(flag); } @Override public void readFields(DataInput dataInput) throws IOException { this.projectCode = dataInput.readUTF(); this.projectName = dataInput.readUTF(); this.projectType = dataInput.readUTF(); this.projectFrom = dataInput.readUTF(); this.flag = dataInput.readUTF(); } public ProjectBean(String projectCode, String projectName, String projectType, String projectFrom, String flag) { this.projectCode = projectCode; this.projectName = projectName; this.projectType = projectType; this.projectFrom = projectFrom; this.flag = flag; } public ProjectBean() { } @Override public String toString() { return "ProjectBean{" + "projectCode='" + projectCode + '\'' + ", projectName='" + projectName + '\'' + ", projectType='" + projectType + '\'' + ", projectFrom='" + projectFrom + '\'' + ", flag=" + flag + '\'' + '}'; } public String getProjectCode() { return projectCode; } public void setProjectCode(String projectCode) { this.projectCode = projectCode; } public String getProjectName() { return projectName; } public void setProjectName(String projectName) { this.projectName = projectName; } public String getProjectType() { return projectType; } public void setProjectType(String projectType) { this.projectType = projectType; } public String getProjectFrom() { return projectFrom; } public void setProjectFrom(String projectFrom) { this.projectFrom = projectFrom; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } }
Reduce Driver
package icu.wzk.demo03; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class ReducerJoinDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // String inputPath = args[0]; // String outputPath = args[1]; // === 测试环境 === String inputPath = "project_test"; String outputPath = "project_test_output"; // === === Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration, "ReducerJoinDriver"); job.setJarByClass(ReducerJoinDriver.class); job.setMapperClass(ReducerJoinMapper.class); job.setReducerClass(ReducerJoinReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(ProjectBean.class); job.setOutputKeyClass(ProjectBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
ReduceMapper
package icu.wzk.demo03; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ReducerJoinMapper extends Mapper<LongWritable, Text, Text, ProjectBean> { String name; ProjectBean projectBean = new ProjectBean(); Text k = new Text(); @Override protected void setup(Mapper<LongWritable, Text, Text, ProjectBean>.Context context) throws IOException, InterruptedException { // 获取路径信息 name = context.getInputSplit().toString(); } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, ProjectBean>.Context context) throws IOException, InterruptedException { String line = value.toString(); if (name.contains("layout_project")) { // layout_project String[] fields = line.split("\t"); projectBean.setProjectCode(fields[0]); projectBean.setProjectName(fields[1]); projectBean.setProjectType(""); projectBean.setProjectFrom(""); projectBean.setFlag("layout_project"); // projectCode 关联 k.set(fields[0]); } else { // project_info String[] fields = line.split("\t"); projectBean.setProjectCode(fields[0]); projectBean.setProjectName(""); projectBean.setProjectType(fields[1]); projectBean.setProjectFrom(fields[2]); projectBean.setFlag("project_info"); // projectCode 关联 k.set(fields[0]); } context.write(k, projectBean); } }
ReduceReducer
package icu.wzk.demo03; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class ReducerJoinReducer extends Reducer<Text, ProjectBean, ProjectBean, NullWritable> { @Override protected void reduce(Text key, Iterable<ProjectBean> values, Reducer<Text, ProjectBean, ProjectBean, NullWritable>.Context context) throws IOException, InterruptedException { List<ProjectBean> dataList = new ArrayList<>(); ProjectBean deviceProjectBean = new ProjectBean(); for (ProjectBean pb : values) { if ("layout_project".equals(pb.getFlag())) { // layout_project ProjectBean projectProjectBean = new ProjectBean( pb.getProjectCode(), pb.getProjectName(), pb.getProjectType(), pb.getProjectFrom(), pb.getFlag() ); dataList.add(projectProjectBean); } else { // project_info deviceProjectBean = new ProjectBean( pb.getProjectCode(), pb.getProjectName(), pb.getProjectType(), pb.getProjectFrom(), pb.getFlag() ); } } for (ProjectBean pb : dataList) { pb.setProjectType(deviceProjectBean.getProjectType()); pb.setProjectFrom(deviceProjectBean.getProjectFrom()); context.write(pb, NullWritable.get()); } } }
运行结果
ProjectBean{projectCode='"02d9c090-e467-42b6-9c14-52cacd72a4a8"', projectName='"社区项目2"', projectType='"重要类型"', projectFrom='"种类1"', flag=layout_project'} ProjectBean{projectCode='"244dcaca-0778-4eec-b3a2-403f8fac1dfb"', projectName='"智慧社区"', projectType='"重要类型"', projectFrom='"种类1"', flag=layout_project'} ProjectBean{projectCode='"2e556d83-bb56-45b1-8d6e-00510902c464"', projectName='"街道公交站点"', projectType='"普通类型"', projectFrom='"种类2"', flag=layout_project'} ProjectBean{projectCode='"3ba00542-eac9-4399-9c2b-3b06e671f4c9"', projectName='"未命名项目1"', projectType='"一般类型"', projectFrom='"种类2"', flag=layout_project'} ProjectBean{projectCode='"5a5982d7-7257-422f-822a-a0c2f31c28d1"', projectName='"未命名项目2"', projectType='"一般类型"', projectFrom='"种类2"', flag=layout_project'} ProjectBean{projectCode='"8aea9ba2-435c-48bd-9751-1cbd4c344d4e"', projectName='"社区项目1"', projectType='"重要类型"', projectFrom='"种类1"', flag=layout_project'} ProjectBean{projectCode='"94befb97-d1af-43f2-b5d5-6df9ce5b9393"', projectName='"公交站点"', projectType='"普通类型"', projectFrom='"种类1"', flag=layout_project'} ProjectBean{projectCode='"f44c8d10-bc92-4398-ad9b-8c11dd48ad7c"', projectName='"街道布建"', projectType='"普通类型"', projectFrom='"种类2"', flag=layout_project'}
方案缺点
JOIN
操作是在 reduce
阶段完成的,reduce端处理压力过大
,map
节点的运算负载很低
,资源利用
率不高
。