大数据基础-从word count开始

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 从word count开始

流程:

1.开发Map阶段代码

2.开发Reduce阶段代码

3.组装job

Map阶段代码:

publicstaticclassMyMapperextendsMapper<LongWritable, Text,Text,LongWritable>{

    Loggerlogger=LoggerFactory.getLogger(MyMapper.class);

    @Override

    protectedvoidmap(LongWritablek1, Textv1, Contextcontext)

            throwsIOException, InterruptedException {

        //输出k1,v1的值

        //System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");

        //logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");

        //k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容

        //对获取到的每一行数据进行切割,把单词切割出来

        String[] words=v1.toString().split(" ");

        //迭代切割出来的单词数据

        for (Stringword : words) {

            //把迭代出来的单词封装成<k2,v2>的形式

            Textk2=newText(word);

            LongWritablev2=newLongWritable(1L);

            //把<k2,v2>写出去

            context.write(k2,v2);

        }

    }

}

Reduce阶段代码:

publicstaticclassMyReducerextendsReducer<Text,LongWritable,Text,LongWritable>{

    Loggerlogger=LoggerFactory.getLogger(MyReducer.class);

    @Override

    protectedvoidreduce(Textk2, Iterable<LongWritable>v2s, Contextcontext)

            throwsIOException, InterruptedException {

        //创建一个sum变量,保存v2s的和

        longsum=0L;

        //对v2s中的数据进行累加求和

        for(LongWritablev2: v2s){

            //输出k2,v2的值

            //System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");

            //logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");

            sum+=v2.get();

        }

        //组装k3,v3

        Textk3=k2;

        LongWritablev3=newLongWritable(sum);

        //输出k3,v3的值

        //System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");

        //logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");

        // 把结果写出去

        context.write(k3,v3);

    }

}

组装Job:

publicstaticvoidmain(String[] args) {

    try{

        if(args.length!=2){

            //如果传递的参数不够,程序直接退出

            System.exit(100);

        }

        //指定Job需要的配置参数

        Configurationconf=newConfiguration();

        //创建一个Job

        Jobjob=Job.getInstance(conf);

        //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的

        job.setJarByClass(WordCountJob.class);

        //指定输入路径(可以是文件,也可以是目录)

        FileInputFormat.setInputPaths(job,newPath(args[0]));

        //指定输出路径(只能指定一个不存在的目录)

        FileOutputFormat.setOutputPath(job,newPath(args[1]));

        //指定map相关的代码

        job.setMapperClass(MyMapper.class);

        //指定k2的类型

        job.setMapOutputKeyClass(Text.class);

        //指定v2的类型

        job.setMapOutputValueClass(LongWritable.class);

        //指定reduce相关的代码

        job.setReducerClass(MyReducer.class);

        //指定k3的类型

        job.setOutputKeyClass(Text.class);

        //指定v3的类型

        job.setOutputValueClass(LongWritable.class);

        //提交job

        job.waitForCompletion(true);

    }catch(Exceptione){

        e.printStackTrace();

    }

}

接下来就可以打包发布到集群

指定mapreduce接收到的第一个参数:文件路径

指定mapreduce接收到的第二个参数:输出目录

访问 http://bigdata01:8088 也可以查看任务输出结果

在out输出目录中,_SUCCESS是一个标记文件,有这个文件表示这个任务执行成功了。 part-r-00000是具体的数据文件,如果有多个reduce任务会产生多个这种文件,多个文件的话会按照从0往下排

还要一点需要注意的 ,part 后面的 r 表示这个结果文件是 reduce 步骤产生的, 如果一个 mapreduce 只有 map阶段没有reduce阶段,那么产生的结果文件是part-m-00000这样的。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
2月前
|
分布式计算 大数据 Java
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
28 1
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
|
2月前
|
分布式计算 大数据 Linux
大数据体系知识学习(二):WordCount案例实现及错误总结
这篇文章介绍了如何使用PySpark进行WordCount操作,包括环境配置、代码实现、运行结果和遇到的错误。作者在运行过程中遇到了Py4JJavaError和JAVA_HOME未设置的问题,并通过导入findspark初始化和设置环境变量解决了这些问题。文章还讨论了groupByKey和reduceByKey的区别。
33 1
|
2月前
|
SQL 分布式计算 大数据
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
51 0
|
2月前
|
大数据 流计算
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
46 0
|
7月前
|
分布式计算 Java Hadoop
大数据实战——WordCount案例实践
大数据实战——WordCount案例实践
|
机器学习/深度学习 移动开发 算法
大数据分析实验,包含五个子实验:wordCount实验,PageRank实验,关系挖掘实验,k-means算法,推荐系统算法。(下)
大数据分析实验,包含五个子实验:wordCount实验,PageRank实验,关系挖掘实验,k-means算法,推荐系统算法。(下)
301 0
大数据分析实验,包含五个子实验:wordCount实验,PageRank实验,关系挖掘实验,k-means算法,推荐系统算法。(下)
|
分布式计算 算法 搜索推荐
大数据分析实验,包含五个子实验:wordCount实验,PageRank实验,关系挖掘实验,k-means算法,推荐系统算法。(上)
大数据分析实验,包含五个子实验:wordCount实验,PageRank实验,关系挖掘实验,k-means算法,推荐系统算法。
383 0
大数据分析实验,包含五个子实验:wordCount实验,PageRank实验,关系挖掘实验,k-means算法,推荐系统算法。(上)
|
分布式计算 大数据 Hadoop
大数据实验——用Spark实现wordcount单词统计
大数据实验——用Spark实现wordcount单词统计
大数据实验——用Spark实现wordcount单词统计
|
分布式计算 Ubuntu Hadoop
【大数据】Linux下安装Hadoop(2.7.1)详解及WordCount运行
在完成了Storm的环境配置之后,想着鼓捣一下Hadoop的安装,网上面的教程好多,但是没有一个特别切合的,所以在安装的过程中还是遇到了很多的麻烦,并且最后不断的查阅资料,终于解决了问题,感觉还是很好的,下面废话不多说,开始进入正题。
969 0
【大数据】Linux下安装Hadoop(2.7.1)详解及WordCount运行
|
分布式计算 大数据 Linux
大数据||MapReduce之wordcount处理过程
文件分割 将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成对,下图所示。这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数(Windows/Linux环境不同)。
1198 0