hadoop streaming多路输出方法和注意点(附超大数据diff对比源码)

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介:

简介

hadoop 支持reduce多路输出的功能,一个reduce可以输出到多个part-xxxxx-X文件中,其中X是A-Z的字母之一,程序在输出<key,value>对的时候,在value的后面追加"#X"后缀,比如#A,输出的文件就是part-00000-A,不同的后缀可以把key,value输出到不同的文件中,方便做输出类型分类, #X仅仅用做指定输出文件后缀, 不会体现到输出的内容中

使用方法

启动脚本中需要指定-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat或者-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleSequenceFileOutputFormat, 输出就会按照多路输出的方式进行分文件输出

所有标准输出的value中都要加上 #X后缀,X代表A-Z, 不然会报invalid suffix错误 

简单示例如下:


$HADOOP_HOME_PATH/bin/hadoop streaming \
      -Dhadoop.job.ugi="$HADOOP_JOB_UGI" \
      -file ./map.sh \
      -file ./red.sh \
      -file ./config.sh \
      -mapper "sh -x map.sh" \
      -reducer "sh -x red.sh" \
      -input $NEW_INPUT_PATH \
      -input $OLD_INPUT_PATH \
      -output  $OUTPUT_PATH \
      -jobconf stream.num.map.output.key.fields=1 \
      -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
      -outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat \
      -jobconf mapred.job.name="test-shapherd-dist-diff" \
      -jobconf mapred.job.priority=HIGH \
      -jobconf mapred.job.map.capacity=100 \
      -jobconf mapred.job.reduce.capacity=100 \
      -jobconf mapred.reduce.tasks=3

在red脚本中可以所以的输出都加上后缀, 这样输出就是分part的了,比如大数据diff对比的脚本


map.sh如下:


source ./config.sh

 

awk 'BEGIN{

}

{

    if(match("'${map_input_file}'","'$OLD_INPUT_PATH'"))

    {

        print $0"\t"0

    next

    }

    if(match("'${map_input_file}'","'$NEW_INPUT_PATH'"))

    print $0"\t"1

}'

 

exit 0

red.sh如下:

awk  -F"\t" 'BEGIN{

    key=""

        flag=0

        num=0

        old_num=0

        new_num=0

        diff_num=0

}

{

    if($NF == "0")

        old_num++

    else

        new_num++

    if($1 != key)

    {

        if(key != "")

        {

            if(num <= 1)

            {

                diff_num++

                if(flag == "0")

                    print $0"#A"

                else

                    print $0"#B"

            }

        }

        key=$1

        flag=$NF

        num=1

        next

    }

 

    if(key == $1)

    {

        num++

        next   

    }

 

}

END{

        if(num  == 1)

        {

            if(flag == "0")

                print $0"#A"

            else

                print $0"#B"

        }

 

        print old_num"\tshapherd#C"

        print new_num"\tshapherd#D"

        print diff_num"\tshapherd#E"

}'

 

 

exit 0

我的两个大数据没有diff, 所以输出就是:

part-00000-C
part-00000-D
part-00000-E
part-00001-C
part-00001-D
part-00001-E
part-00002-C
part-00002-D
part-00002-E

没有A和B结尾的

注意事项

  • 多路输出最多支持26路, 也就是字母只能是A-Z范围。
  • reduce的输入key和value的分隔符默认是\t, 如果输出中没有\t,reduce脚本会把整行当作key, value就是空的,这时如果加了#X,会报invalid suffix错误,因为#X作为了key的一部分,这种问题一种是保证你的key和value是按照\t分隔的, 一种是指定自己想要的分隔符。













相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
23天前
|
SQL 分布式计算 数据可视化
基于Hadoop的大数据可视化方法
【8月更文第28天】在大数据时代,有效地处理和分析海量数据对于企业来说至关重要。Hadoop作为一个强大的分布式数据处理框架,能够处理PB级别的数据量。然而,仅仅完成数据处理还不够,还需要将这些数据转化为易于理解的信息,这就是数据可视化的重要性所在。本文将详细介绍如何使用Hadoop处理后的数据进行有效的可视化分析,并会涉及一些流行的可视化工具如Tableau、Qlik等。
55 0
|
2月前
|
分布式计算 Hadoop
|
2月前
|
分布式计算 Hadoop 测试技术
Hadoop格式化前备份数据
【7月更文挑战第22天】
82 7
|
2月前
|
存储 分布式计算 Hadoop
hadoop格式化前数据导出
【7月更文挑战第23天】
36 5
|
2月前
|
存储 分布式计算 资源调度
hadoop确认格式化的方法
【7月更文挑战第21天】
33 5
|
2月前
|
分布式计算 Hadoop 关系型数据库
实时计算 Flink版操作报错合集之Hadoop在将文件写入HDFS时,无法在所有指定的数据节点上进行复制,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
存储 分布式计算 Hadoop
Hadoop字符串型数据
【7月更文挑战第9天】
38 3
|
2月前
|
存储 JSON 分布式计算
hadoop选择数值型数据
【7月更文挑战第9天】
31 1
|
3月前
|
分布式计算 Hadoop 数据处理
Hadoop数据倾斜的数据特性
【6月更文挑战第21天】
21 1
|
3月前
|
存储 分布式计算 Hadoop
Hadoop数据重分布数据冗余和备份
【6月更文挑战第17天】
52 4

相关实验场景

更多