概述
MapReduce是一个计算模型和框架,Hadoop将其实现并整合,因此我们可以脱离Hadoop软件环境直接在项目中编码测试。在实际的生产环境中,每个计算任务都是以jar包的形式存在,周期性的以不同的参数提交执行,这是离线计算任务的常见模式。当然,在测试阶段,可能也会使用yarn方式远程提交集群来执行任务,这可以更加真实的模拟任务的运行情况,同时也方便调试。
一、本地运行
1. 方式特点
本地运行方式主要是为了方便程序的调试,数据的输入与结果的输出都使用本地文件系统,完整案例代码可以参考:一个例子带你了解MapReduce。
2. 动态参数
这里我们为了后续的打包部署方便调整参数,将Executor类修改为动态传参:
import edu.sand.mapper.WordCountMapper; import edu.sand.reducer.WordCountReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WordCountExecutor extends Configured implements Tool { @Override public int run(String[] args) throws Exception { // 初始化配置,可以通过这个对象设置各种参数 Configuration conf = new Configuration(); // 完成Job初始化,设置任务名称 Job job = Job.getInstance(conf, "wordCount"); // 设置Job的运行主类 job.setJarByClass(WordCountExecutor.class); // 设置Map阶段的执行类 job.setMapperClass(WordCountMapper.class); // 设置Map阶段的数据输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 设置Reduce阶段的执行类 job.setReducerClass(WordCountReducer.class); // 设置Reduce阶段的数据输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 指定数据输入文件路径,如果指定的是文件夹,将读取目录下所有文件 FileInputFormat.setInputPaths(job, new Path(args[0])); // 指定结果输出文件路径,最后一级路径会自动创建,每次重新执行时需要删除或修改名称 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 使用job调用执行,true代表显示详细信息,成功时返回0 return job.waitForCompletion(true) ? 0 : -1; } public static void main(String[] args) throws Exception { // 调用执行 ToolRunner.run(new Configuration(), new WordCountExecutor(), args); } }
3. 执行程序
在执行前,我们将需要传入的参数填写在运行/调试配置中:
二、远程提交
1. 方式特点
这种方式需要先搭建好一个Hadoop集群,然后将任务提交到该集群运行,一般用于最终的测试阶段,检查代码是否可以在集群中正常运行。也可以用于在学习开发阶段处理大规模的数据,Hadoop的集群搭建步骤可以参考:Hadoop 3.x各模式部署 - Ubuntu。
特别注意:如果你的Hadoop环境使用的是伪分布模式,则需要保证它可以被远程访问到,也就是说Hadoop的配置文件中不要使用127.0.0.1【也要留意/etc/hosts文件中是否存在默认主机名的ip映射】,否则将导致只能在Hadoop所在的机器上使用,可以使用网卡IP或新映射另一个主机名,然后进行配置。
2. 插件准备
- 插件安装
笔者使用IDEA工具进行开发,使用其中的Big Data Tools可以方便的连接到各种大数据组件,包括HDFS等。
在使用远程提交时,因为是在集群上运行,所以此时会使用HDFS,我们可以先通过插件上传需要的数据源,也可以查看到结果文件。
- 插件启用
一般情况下,插件会停靠在窗口的右侧,如果没有出现可以手动开启:
开启后的效果如下:
- 新建连接:
双击HDFS按钮,然后填写Hadoop集群的远程地址和用户名,再点击测试连接,没问题后点击确定。
双击打开连接后即可查看到HDFS,但是不能够操作根目录,除此之外都可以通过右键菜单完成,我们可以将需要用到的数据文件上传到对应的路径。
如果根目录需要创建目录,可以通过命令或者在Hadoop网页管理界面完成:
3. 代码修改
- 资源文件
由于需要提交到集群运行,所以集群地址等信息都需要手动配置进去,但是其中可能存在许多的配置项,我们可以直接将Hadoop的配置文件拷贝到Resours文件夹:
- conf属性
在Job初始化之前,添加如下配置项:
// 设置用户名称为Hadoop集群启动的用户名,可以避免一些权限问题 System.setProperty("HADOOP_USER_NAME", "hadoop"); // 初始化配置,可以通过这个对象设置各种参数 Configuration conf = new Configuration(); // 指定配置文件,自动加载其中的配置项 conf.addResource("core-site.xml"); conf.addResource("hdfs-site.xml"); conf.addResource("mapred-site.xml"); conf.addResource("yarn-site.xml"); // 完成Job初始化,设置任务名称 Job job = Job.getInstance(conf, "wordCount"); // 设置任务执行所需要的jar包 job.setJar("/Path/to/wordCount.jar");
至少保证其中包含以下配置【可以手动配置】:
// 端口号修改为实际的集群配置 conf.set("fs.defaultFS", "hdfs://ubuntu:8020"); conf.set("mapreduce.framework.name", "yarn"); conf.set("yarn.resourcemanager.address", "ubuntu:8032"); // Windows用户可能需要添加以下配置 conf.set("mapreduce.app-submission.cross-platform", "true");
- 运行参数
此时的运行参数代表的是HDFS的路径,需要特别注意,如果路径最前方没有出现斜杠,则是代表相对路径,比如:input/
代表的是 /user/hadoop/input
,所以如果想指定为根路径下的文件夹,一定要在开头添加斜杠:
4. 打包流程
在初始化Job后,需要手动构建一个jar包并进行配置,并使用job.setJar替换掉原有的setJarByClass【其实保留也没有影响,但是强迫症促使我把没用的都删掉】。
- 打开项目结构 - 工件【Artifacts】
- 创建空的jar包,手动添加所需要的内容
因为我们不需要把Maven依赖的所有东西都放进去,只需要将编译后程序放进去就可以了。
- 添加模块输出
修改jar包名称,检查完整路径,与代码中的配置一致。添加模块输出,本例中MANIFEST.MF文件可以忽略,或者创建一个默认的放在resources文件夹下也可以。
5. 结果查看
使用之前配置好的运行参数执行即可,可以在Big Data Tools插件中查看到结果:
同时也可以在管理界面看到运行的任务信息:
三、打包部署
1. 方式特点
在生产环境中,MapReduce程序的最终产物是jar包,会提交到集群运行。在打包时,代码中一般不需要额外添加关于集群参数的配置。可以使用本地运行模式的代码,同时重新编辑一下jar包的生成即可。
2. Jar包传输
按照之前的流程进行打包,由于在运行时可以指定要运行的主类以及相关的参数,所以只需要确保jar包中包含所需的程序即可。打包完成后,可以使用SFTP工具将jar包文件发送到远程集群。同样可以使用IDEA Big Data Tools插件来创建一个SFTP连接:
3. 结果查看
jar包传输完成后,使用以下命令提交执行【执行前记得删除结果目录】:
hadoop jar wordCount.jar package.name.WordCountExecutor /input /output/wordCount
使用hadoop jar命令时需要以下几个参数:
- jar包完整路径
- 要执行的主类
- 需要传入的参数【如有】
四、案例源码
涉及到使用配置文件的部分,请修改为自己集群的参数。