8. Map端实现 JOIN
8.1 概述
适用于关联表中有小表的情形.
使用分布式缓存,可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度
8.2 实现步骤
先在mapper类中预先定义好小表,进行join
引入实际场景中的解决方案:一次加载数据库
- 定义Mapper
import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; public class MapperTask extends Mapper<LongWritable, Text, Text, Text> { private Map<String,String> map = new HashMap<>(); // 初始化的方法, 只会被初始化一次 @Override protected void setup(Context context) throws IOException, InterruptedException { URI[] cacheFiles = DistributedCache.getCacheFiles(context.getConfiguration()); URI fileURI = cacheFiles[0]; FileSystem fs = FileSystem.get(fileURI, context.getConfiguration()); FSDataInputStream inputStream = fs.open(new Path(fileURI)); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); String readLine =""; while ((readLine = bufferedReader.readLine() ) != null ) { // readlLine: product一行数据 String[] split = readLine.split(","); String pid = split[0]; map.put(pid,split[1]+"\t"+split[2]+"\t"+split[3]); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1. 读取一行数据: orders数据 String line = value.toString(); //2. 切割 String[] split = line.split(","); String pid = split[2]; //3. 到map中获取商品信息: String product = map.get(pid); //4. 发送给reduce: 输出 context.write(new Text(pid),new Text(split[0]+"\t"+split[1]+"\t"+product +"\t"+split[3])); } }
- 定义主类
import com.itheima.join.reduce.JobReduceJoinMain; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.net.URI; public class JobMapperJoinMain extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { //设置缓存的位置, 必须在run的方法的最前, 如果放置在job任务创建后, 将无效 // 缓存文件的路径, 必须存储在hdfs上, 否则也是无效的 DistributedCache.addCacheFile(new URI("hdfs://node01:8020/cache/pdts.txt"),super.getConf()); //1. 获取job 任务 Job job = Job.getInstance(super.getConf(), "jobMapperJoinMain"); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("E:\\传智工作\\上课\\北京大数据30期\\大数据第六天\\资料\\map端join\\map_join_iput")); job.setMapperClass(MapperTask.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("E:\\传智工作\\上课\\北京大数据30期\\大数据第六天\\资料\\map端join\\out_put_map")); boolean b = job.waitForCompletion(true); return b?0:1; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); JobMapperJoinMain jobMapperJoinMain = new JobMapperJoinMain(); int i = ToolRunner.run(conf, jobMapperJoinMain, args); System.exit(i); } }
9. 社交粉丝数据分析
9.1 需求分析
以下是qq的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友关系是单向的)
A:B,C,D,F,E,O B:A,C,E,K C:A,B,D,E,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E,O I:A,O J:B,O K:A,C,D L:D,E,F M:E,F,G O:A,H,I,J
求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?
【解题思路】
第一步 map 读一行 A:B,C,D,F,E,O 输出 <B,A><C,A><D,A><F,A><E,A><O,A> 在读一行 B:A,C,E,K 输出 <A,B><C,B><E,B><K,B> REDUCE 拿到的数据比如<C,A><C,B><C,E><C,F><C,G>...... 输出: <A-B,C> <A-E,C> <A-F,C> <A-G,C> <B-E,C> <B-F,C>..... 第二步 map 读入一行<A-B,C> 直接输出<A-B,C> reduce 读入数据 <A-B,C><A-B,F><A-B,G>....... 输出: A-B C,F,G,.....
9.2 实现步骤
第一个MapReduce代码实现
【Mapper类】
public class Step1Mapper extends Mapper<LongWritable,Text,Text,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1:以冒号拆分行文本数据: 冒号左边就是V2 String[] split = value.toString().split(":"); String userStr = split[0]; //2:将冒号右边的字符串以逗号拆分,每个成员就是K2 String[] split1 = split[1].split(","); for (String s : split1) { //3:将K2和v2写入上下文中 context.write(new Text(s), new Text(userStr)); } } }
【Reducer类】
public class Step1Reducer extends Reducer<Text,Text,Text,Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //1:遍历集合,并将每一个元素拼接,得到K3 StringBuffer buffer = new StringBuffer(); for (Text value : values) { buffer.append(value.toString()).append("-"); } //2:K2就是V3 //3:将K3和V3写入上下文中 context.write(new Text(buffer.toString()), key); } }
JobMain:
public class JobMain extends Configured implements Tool { @Override public int run(String[] args) throws Exception { //1:获取Job对象 Job job = Job.getInstance(super.getConf(), "common_friends_step1_job"); //2:设置job任务 //第一步:设置输入类和输入路径 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path("file:///D:\\input\\common_friends_step1_input")); //第二步:设置Mapper类和数据类型 job.setMapperClass(Step1Mapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //第三,四,五,六 //第七步:设置Reducer类和数据类型 job.setReducerClass(Step1Reducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //第八步:设置输出类和输出的路径 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\common_friends_step1_out")); //3:等待job任务结束 boolean bl = job.waitForCompletion(true); return bl ? 0: 1; } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); //启动job任务 int run = ToolRunner.run(configuration, new JobMain(), args); System.exit(run); } }
第二个MapReduce代码实现
【Mapper类】
public class Step2Mapper extends Mapper<LongWritable,Text,Text,Text> { /* K1 V1 0 A-F-C-J-E- B ---------------------------------- K2 V2 A-C B A-E B A-F B C-E B */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1:拆分行文本数据,结果的第二部分可以得到V2 String[] split = value.toString().split("\t"); String friendStr =split[1]; //2:继续以'-'为分隔符拆分行文本数据第一部分,得到数组 String[] userArray = split[0].split("-"); //3:对数组做一个排序 Arrays.sort(userArray); //4:对数组中的元素进行两两组合,得到K2 /* A-E-C -----> A C E A C E A C E */ for (int i = 0; i <userArray.length -1 ; i++) { for (int j = i+1; j < userArray.length ; j++) { //5:将K2和V2写入上下文中 context.write(new Text(userArray[i] +"-"+userArray[j]), new Text(friendStr)); } } } }
【Reducer类】
public class Step2Reducer extends Reducer<Text,Text,Text,Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //1:原来的K2就是K3 //2:将集合进行遍历,将集合中的元素拼接,得到V3 StringBuffer buffer = new StringBuffer(); for (Text value : values) { buffer.append(value.toString()).append("-"); } //3:将K3和V3写入上下文中 context.write(key, new Text(buffer.toString())); } }
【JobMain】
public class JobMain extends Configured implements Tool { @Override public int run(String[] args) throws Exception { //1:获取Job对象 Job job = Job.getInstance(super.getConf(), "common_friends_step2_job"); //2:设置job任务 //第一步:设置输入类和输入路径 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path("file:///D:\\out\\common_friends_step1_out")); //第二步:设置Mapper类和数据类型 job.setMapperClass(Step2Mapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //第三,四,五,六 //第七步:设置Reducer类和数据类型 job.setReducerClass(Step2Reducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //第八步:设置输出类和输出的路径 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\common_friends_step2_out")); //3:等待job任务结束 boolean bl = job.waitForCompletion(true); return bl ? 0: 1; } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); //启动job任务 int run = ToolRunner.run(configuration, new JobMain(), args); System.exit(run); } }
10. 倒排索引建立
10.1 需求分析
需求:有大量的文本(文档、网页),需要建立搜索索引
思路分析:
首选将文档的内容全部读取出来,加上文档的名字作为key,文档的value为1,组织成这样的一种形式的数据
map端数据输出:
hello-a.txt 1 hello-a.txt 1 hello-a.txt 1
reduce端数据输出:
hello-a.txt 3
10.2 代码实现
public class IndexCreate extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(),new IndexCreate(),args); } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(super.getConf(), IndexCreate.class.getSimpleName()); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("file:///D:\\倒排索引\\input")); job.setMapperClass(IndexCreateMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(IndexCreateReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("file:///D:\\倒排索引\\outindex")); boolean bool = job.waitForCompletion(true); return bool?0:1; } public static class IndexCreateMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ Text text = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取文件切片 FileSplit fileSplit = (FileSplit) context.getInputSplit(); //通过文件切片获取文件名 String name = fileSplit.getPath().getName(); String line = value.toString(); String[] split = line.split(" "); //输出 单词--文件名作为key value是1 for (String word : split) { text.set(word+"--"+name); context.write(text,v); } } } public static class IndexCreateReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ IntWritable value = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count += value.get(); } value.set(count); context.write(key,value); } } }