Hadoop学习:MapReduce不使用Reduce将表合并提高效率

简介: Hadoop学习:MapReduce不使用Reduce将表合并提高效率

一、✌题目要求

record表:

ID 城市编号 空气指数
001 03 245
002 02 655
003 05 743
004 04 246
005 02 956
006 01 637
007 05 831
008 03 683
009 02 349

city表:

城市编号 城市名称
01 长沙
02 株洲
03 湘潭
04 怀化
05 岳阳

目标表:

ID 城市名称 空气指数
001 湘潭 245
002 株洲 655
003 岳阳 743
004 怀化 246
005 株洲 956
006 长沙 637
007 岳阳 831
008 湘潭 683
009 株洲 349

三、✌代码实现

1.✌Bean类

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Bean implements Writable {
    private String id;
    private String pid;
    private int amount;
    private String pname;
    private String type;
    public Bean() {
        super();
    }
    public Bean(String id, String pid, int amount, String pname, String type) {
        this.id = id;
        this.pid = pid;
        this.amount = amount;
        this.pname = pname;
        this.type = type;
    }
    @Override
    public String toString() {
        return id + "\t" + pname + "\t\t" + amount;
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getPid() {
        return pid;
    }
    public void setPid(String pid) {
        this.pid = pid;
    }
    public int getAmount() {
        return amount;
    }
    public void setAmount(int amount) {
        this.amount = amount;
    }
    public String getPname() {
        return pname;
    }
    public void setPname(String pname) {
        this.pname = pname;
    }
    public String getType() {
        return type;
    }
    public void setType(String type) {
        this.type = type;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(id);
        out.writeUTF(pid);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(type);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        id = in.readUTF();
        pid = in.readUTF();
        amount = in.readInt();
        pname = in.readUTF();
        type = in.readUTF();
    }
}

2.✌Map类

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
public class Map extends Mapper<LongWritable, Text, Bean, NullWritable> {
    HashMap<String, String> map = new HashMap();
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //获取缓冲数据
        URI[] cacheFiles = context.getCacheFiles();
        String path = cacheFiles[0].getPath().toString();
        //创建缓冲流
        BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
        String line;
        while (StringUtils.isNotEmpty(line = reader.readLine())) {
            String[] words = line.split("\t");
            map.put(words[0], words[1]);
        }
        reader.close();
    }
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split("\t");
        context.write(new Bean(words[0], words[1], Integer.parseInt(words[2]), map.get(words[1]), ""), NullWritable.get());
    }
}

3.✌Driver类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class Driver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        args = new String[]{"D:/input/inputword", "D:/output"};
        BasicConfigurator.configure();
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(Driver.class);
        job.setMapperClass(Map.class);
        job.addCacheFile(new URI("file:///D:/input/inputcache/pd.txt"));
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(Bean.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}


目录
相关文章
|
22天前
|
缓存 分布式计算 算法
优化Hadoop MapReduce性能的最佳实践
【8月更文第28天】Hadoop MapReduce是一个用于处理大规模数据集的软件框架,适用于分布式计算环境。虽然MapReduce框架本身具有很好的可扩展性和容错性,但在某些情况下,任务执行可能会因为各种原因导致性能瓶颈。本文将探讨如何通过调整配置参数和优化算法逻辑来提高MapReduce任务的效率。
90 0
|
3月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
44 1
|
3月前
|
数据采集 SQL 分布式计算
|
3月前
|
存储 分布式计算 Hadoop
Hadoop生态系统详解:HDFS与MapReduce编程
Apache Hadoop是大数据处理的关键,其核心包括HDFS(分布式文件系统)和MapReduce(并行计算框架)。HDFS为大数据存储提供高容错性和高吞吐量,采用主从结构,通过数据复制保证可靠性。MapReduce将任务分解为Map和Reduce阶段,适合大规模数据集的处理。通过代码示例展示了如何使用MapReduce实现Word Count功能。HDFS和MapReduce的结合,加上YARN的资源管理,构成处理和分析大数据的强大力量。了解和掌握这些基础对于有效管理大数据至关重要。【6月更文挑战第12天】
106 0
|
3月前
|
分布式计算 Java Hadoop
简单的java Hadoop MapReduce程序(计算平均成绩)从打包到提交及运行
简单的java Hadoop MapReduce程序(计算平均成绩)从打包到提交及运行
41 0
|
3月前
|
分布式计算 Hadoop Java
使用Hadoop MapReduce分析邮件日志提取 id、状态 和 目标邮箱
使用Hadoop MapReduce分析邮件日志提取 id、状态 和 目标邮箱
|
数据采集 分布式计算 搜索推荐
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
|
存储 分布式计算 Hadoop
Hadoop基础学习---6、MapReduce框架原理(一)
Hadoop基础学习---6、MapReduce框架原理(一)
|
存储 分布式计算 Hadoop
【Hadoop】一个例子带你了解MapReduce
【Hadoop】一个例子带你了解MapReduce
85 1
|
数据采集 缓存 分布式计算
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)

相关实验场景

更多