3、Join应用
3.1 Reduce Join
(1)Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
(2)Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。
3.2 Reduce Join实操
1、需求
2、需求分析
通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联。
3、代码实现
(1)创建商品和订单合并后的TableBean类
package org.example._09ReduceJoin; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @ClassName TableBean * @Description TODO * @Author Zouhuiming * @Date 2023/5/22 15:09 * @Version 1.0 */ public class TableBean implements Writable { //订单表id private int id; //公司id private int pid; //库存数量 private int amount; //公司名字 private String pname; //表名 private String flag; public TableBean() { } public int getId() { return id; } public void setId(int id) { this.id = id; } public int getPid() { return pid; } public void setPid(int pid) { this.pid = pid; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(id); dataOutput.writeInt(pid); dataOutput.writeInt(amount); dataOutput.writeUTF(pname); dataOutput.writeUTF(flag); } @Override public void readFields(DataInput dataInput) throws IOException { this.id=dataInput.readInt(); this.pid=dataInput.readInt(); this.amount=dataInput.readInt(); this.pname= dataInput.readUTF(); this.flag=dataInput.readUTF(); } @Override public String toString() { return id+"\t"+pname+"\t"+amount; } }
(2)创建Mapper类
package org.example._09ReduceJoin; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; /** * @ClassName TableMapper * @Description TODO * @Author Zouhuiming * @Date 2023/5/22 15:13 * @Version 1.0 */ public class TableMapper extends Mapper<LongWritable, Text, IntWritable,TableBean> { //pid private IntWritable outK=new IntWritable(); private TableBean outV=new TableBean(); private String filename; //每一个切片进一次 @Override protected void setup(Mapper<LongWritable, Text, IntWritable, TableBean>.Context context) throws IOException, InterruptedException { //获取对应文件名称 InputSplit inputSplit = context.getInputSplit(); FileSplit fileSplit= (FileSplit) inputSplit; filename = fileSplit.getPath().getName(); } //每一行进一次 @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, TableBean>.Context context) throws IOException, InterruptedException { String line = value.toString(); //判断是哪个文件,然后针对文件进行不同的操作 if (filename.contains("order")){//订单表的处理 String[] split = line.split("\t"); //封装outK outK.set(Integer.parseInt(split[1])); //封装outV outV.setId(Integer.parseInt(split[0])); outV.setPid(Integer.parseInt(split[1])); outV.setAmount(Integer.parseInt(split[2])); outV.setPname(""); outV.setFlag("order"); }else { //pd表的处理 String[] split = line.split("\t"); //封装outK outK.set(Integer.parseInt(split[0])); //封装outV outV.setId(0); outV.setPid(Integer.parseInt(split[0])); outV.setAmount(0); outV.setPname(split[1]); outV.setFlag("pd"); } context.write(outK,outV); } }
(3)创建Reduce类
package org.example._09ReduceJoin; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.beanutils.BeanUtilsBean; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; /** * @ClassName TableReducer * @Description TODO * @Author Zouhuiming * @Date 2023/5/22 15:27 * @Version 1.0 */ public class TableReducer extends Reducer<IntWritable,TableBean,TableBean, NullWritable> { //每次key相同进来一次 @Override protected void reduce(IntWritable key, Iterable<TableBean> values, Reducer<IntWritable, TableBean, TableBean, NullWritable>.Context context) throws IOException, InterruptedException { ArrayList<TableBean> orderBeans=new ArrayList<>(); TableBean pdBean=new TableBean(); for (TableBean value : values) { if ("order".equals(value.getFlag())){//订单表 //创建一个临时TableBean对象接受Value TableBean tmpOrderBean=new TableBean(); try { //将value对象复制给tmpOrderBean BeanUtils.copyProperties(tmpOrderBean,value); } catch (IllegalAccessException e) { throw new RuntimeException(e); } catch (InvocationTargetException e) { throw new RuntimeException(e); } //将临时TableBean对象添加到集合orderBeans orderBeans.add(tmpOrderBean); }else { //商品表 try { BeanUtils.copyProperties(pdBean,value); } catch (IllegalAccessException e) { throw new RuntimeException(e); } catch (InvocationTargetException e) { throw new RuntimeException(e); } } } //便利集合orderBeans,替换掉每个orderBean的pid为pname,然后写出 for (TableBean orderBean : orderBeans) { orderBean.setPname(pdBean.getPname()); //写出修改后的orderBean对象 context.write(orderBean,NullWritable.get()); } } }
(4)创建Driver类
package org.example._09ReduceJoin; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @ClassName TableDriver * @Description TODO * @Author Zouhuiming * @Date 2023/5/22 15:39 * @Version 1.0 */ public class TableDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration configuration=new Configuration(); Job job=Job.getInstance(configuration); job.setJarByClass(TableDriver.class); job.setMapperClass(TableMapper.class); job.setReducerClass(TableReducer.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(TableBean.class); job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job,new Path("E:\\test\\input4")); FileOutputFormat.setOutputPath(job,new Path("E:\\test\\output8")); System.exit(job.waitForCompletion(true)?0:1); } }
4、测试
运行结果
1004 小米 4 1001 小米 1 1005 华为 5 1002 华为 2 1006 格力 6 1003 格力 3
5、总结
缺点:这种方式中,合并的操作是在Reduce阶段完成的,Reduce端的处理压力太大,Map节点的运算负载很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。
解决方法:Map端实现数据合并。
3.3 Mapper Join
1、使用场景
Map Join适合于一张表很小、一张表很大的场景。
2、优点:
在 Map 端缓存多张表,提前处理业务逻辑,这样增加 Map 端业务,减少 Reduce 端数据的压力,尽可能的减少数据倾斜。
3、具体办法
(1)在 Mapper 的 setup 阶段,将文件读取到缓存集合中。
(2)在 Driver 驱动类中加载缓存。
//缓存普通文件到 Task 运行节点。 job.addCacheFile(new URI("file:///e:/cache/pd.txt")); //如果是集群运行,需要设置 HDFS 路径 job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
3.4 Map Join案例实操
1、需求
2、需求分析
3、实现代码
(1)先在Driver中添加缓存文件
package org.example._10MapperJoin; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; /** * @ClassName MapJoinDriver * @Description TODO * @Author Zouhuiming * @Date 2023/5/22 16:16 * @Version 1.0 */ public class MapJoinDriver { public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException { // 1 获取 job 信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 设置加载 jar 包路径 job.setJarByClass(MapJoinDriver.class); // 3 关联 mapper job.setMapperClass(MapJoinMapper.class); // 4 设置 Map 输出 KV 类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); // 5 设置最终输出 KV 类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 加载缓存数据 job.addCacheFile(new URI("file:///E:/test/input4/pd.txt")); // Map 端 Join 的逻辑不需要 Reduce 阶段,设置 reduceTask 数量为 0 job.setNumReduceTasks(0); // 6 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path("E:\\test\\input5")); FileOutputFormat.setOutputPath(job, new Path("E:\\test\\output9")); // 7 提交 boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
(2)Mapper(在setup方法中读取缓冲文件)
package org.example._10MapperJoin; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; /** * @ClassName MapJoinMapper * @Description TODO * @Author Zouhuiming * @Date 2023/5/22 16:17 * @Version 1.0 */ public class MapJoinMapper extends Mapper<LongWritable, Text,Text, NullWritable> { private Map<String,String> pdMap=new HashMap<>(); Text text=new Text(); //任务开始之前将pd数据缓存进pdMap @Override protected void setup(Context context) throws IOException, InterruptedException { //通过缓存文件得到小表数据pd.txt URI[] cacheFiles = context.getCacheFiles(); Path path=new Path(cacheFiles[0]); //获取文件系统对象,并开流 FileSystem fileSystem = FileSystem.get(context.getConfiguration()); FSDataInputStream open = fileSystem.open(path); //通过包装流转换为reader,方便按行读取 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open, "UTF-8")); //逐行读取,按行处理 String line; while (StringUtils.isNotEmpty(line=bufferedReader.readLine())){ //切割一行 //01 小米 String[] split = line.split("\t"); pdMap.put(split[0],split[1]); } //关流 IOUtils.closeStream(bufferedReader); } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { //读取大表数据 String[] split = value.toString().split("\t"); //通过大表每行数据的pid,去pdMap里面取出pname String name=pdMap.get(split[1]); //将大表每行数据的pid替换为pname text.set(split[0]+"\t"+name+"\t"+split[2]); context.write(text,NullWritable.get()); } }
4、数据清洗
“ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL 一词较常用在数据仓库,但其对象并不限于数据仓库。
在运行核心业务 MapReduce 程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行 Mapper 程序,不需要运行 Reduce 程序。
1、需求
去除日志中字段个数小于等于 11 的日志。
(1)输入数据
链接:https://pan.baidu.com/s/1z_nM2e3JrHHZL_5WfbxjaQ
提取码:zhm6
2、需求分析
需要在 Map 阶段对输入的数据根据规则进行过滤清洗。
3、实现代码
(1)Mapper类
package org.example._11ETL; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet; import java.io.IOException; /** * @ClassName ETLMapper * @Description TODO * @Author Zouhuiming * @Date 2023/5/22 16:33 * @Version 1.0 */ public class ETLMapper extends Mapper<LongWritable, Text,Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { //1、获取一行数据 String line=value.toString(); //2、解析日志 boolean result=parseLog(line,context); //3、日志不合法 if (!result){ return; } //4、日志合法就直接写出 context.write(value,NullWritable.get()); } //封装解析日志的方法 private boolean parseLog(String line, Mapper<LongWritable, Text, Text, NullWritable>.Context context) { //1、截取 String[] fields = line.split(" "); //2、日志长度大于11的为合法 if (fields.length>11){ return true; } else { return false; } } }
(2)Driver类
package org.example._11ETL; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.example._08outputformat.LogDriver; import java.io.IOException; /** * @ClassName ETLDriver * @Description TODO * @Author Zouhuiming * @Date 2023/5/22 16:39 * @Version 1.0 */ public class ETLDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "E:\\test\\input6", "E:\\test\\output10" }; // 1 获取 job 信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 加载 jar 包 job.setJarByClass(LogDriver.class); // 3 关联 map job.setMapperClass(ETLMapper.class); // 4 设置最终输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 设置 reducetask 个数为 0 job.setNumReduceTasks(0); // 5 设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6 提交 boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
5、MapReduce开发总结
1、输入数据接口:InputFormat
(1)默认使用的实现类是TextInputFormat
(2)TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回。
(3)CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率。
2、逻辑处理接口:Mapper
用户根据业务需求实现其中的三个方法:map() setup() cleanup()
3、Partitioner分区
(1)有默认实现HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces
(2)如果业务上有特别的需求,可以自定义分区。
4、Comparable排序
(1)当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法。
(2)部分排序:对最输出的每一个文件内部进行排序。
(3)全排序:对所以数据进行排序,通常只有一个Reduce。
(4)二次排序:排序的条件有两个。
5、Combiner合并
Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果。
6、逻辑处理接口:Reducer
用户根据业务需求实现其中的三个方法: reduce() setup() cleanup()
7、输出数据接口:OutputFormat
(1)默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对,向目标文本文件输出一行。
(2)用户还可以自定义OutputFormat。