MapReduce中一次reduce方法的调用中key的值不断变化分析及源码解析

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介:   摘要:mapreduce中执行reduce(KEYIN key, Iterable values, Context context),调用一次reduce方法,迭代value集合时,发现key的值也是在不断变化的,这是因为key的地址在内部会随着value的迭代而不断变化。

  摘要:mapreduce中执行reduce(KEYIN key, Iterable<VALUEIN> values, Context context),调用一次reduce方法,迭代value集合时,发现key的值也是在不断变化的,这是因为key的地址在内部会随着value的迭代而不断变化。

  序:我们知道reduce方法每执行一次,里面我们会通过for循环迭代value的迭代器。如果key是bean的时候,for循环里面value值变化的同时我们的bean值也是会跟随着变化,调用reduce方法时传参数就传了一次key的值,但是在方法内部迭代的时候,key值在变化,那他怎么变动的?

  误区:在map处理完成之后,将所有kv对缓存起来,进行分组,然后传递一个组<key,valus{}>,调用一次reduce方法传入的key和value的迭代器如<hello,{1,1,1,1,1,1.....}>。

 

给一个需求来观察现象

  对日志数据中的上下行流量信息汇总,并输出按照总流量倒序排序的结果,且该需求日志中手机号是不会重复的——即不会存在多条数据,手机号相同,且流量不同,还需要进行多条数据的汇总。

数据如下:

13888888801,1,9,10
13888888802,5,5,10
13888888803,2,7,9
13888888804,4,6,10
13888888805,6,4,10
13888888806,1,0,1

分析

  基本思路:实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输。

  MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key,所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable,然后重写key的compareTo方法。

package cn.intsmaze.flowsum.SortBean;
public class FlowBeanOne implements WritableComparable<FlowBeanOne> {

    private long upFlow;
    private long dFlow;
    private long sumFlow;
    private long phone;
    
    // 序列化框架在反序列化操作创建对象实例时会调用无参构造
    public FlowBeanOne() {
    }

    // 序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(dFlow);
        out.writeLong(sumFlow);
        out.writeLong(phone);
    }

    // 反序列化方法,注意: 字段的反序列化顺序与序列化时的顺序保持一致
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.dFlow = in.readLong();
        this.sumFlow = in.readLong();
        this.phone = in.readLong();
    }
    
    public void set(long phone,long upFlow, long dFlow) {
        this.phone=phone;
        this.upFlow = upFlow;
        this.dFlow = dFlow;
        this.sumFlow = upFlow + dFlow;
    }

    @Override
    public String toString() {
        return upFlow + "\t" + dFlow + "\t" + sumFlow+ "\t" + phone;
    }
  
//自定义倒序比较规则,总流量相同视为同一个key. @Override
public int compareTo(FlowBeanOne o) { return (int)(o.getSumFlow() - this.sumFlow); } get,set...... }

代码实现如下:

 
 
package cn.intsmaze.flowsum.SortBean;
/**
* 实现流量汇总并且按照流量大小倒序排序 * 前提:处理的数据是已经汇总过的结果文件,然后再次对该文件进行排序 * @author */ public class FlowSumSort { public static class FlowSumSortMapperOne extends Mapper<LongWritable, Text, FlowBeanOne, Text> { FlowBeanOne k = new FlowBeanOne(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split(","); long phoneNbr = Long.parseLong(fields[0]); long upFlowSum = Long.parseLong(fields[1]); long dFlowSum = Long.parseLong(fields[2]); k.set(phoneNbr,upFlowSum, dFlowSum);//这里对bean作为key。 context.write(k, v); } } public static class FlowSumSortReducerOne extends Reducer<FlowBeanOne, Text, Text, FlowBeanOne> {
@Override
protected void reduce(FlowBeanOne bean, Iterable<Text> phoneNbrs, Context context) throws IOException, InterruptedException { System.out.println("-------------------"); for (Text text : phoneNbrs) { System.out.println(bean); context.write(text, bean); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumSort.class); // 告诉框架,我们的程序所用的mapper类和reducer类 job.setMapperClass(FlowSumSortMapperOne.class); job.setReducerClass(FlowSumSortReducerOne.class); job.setMapOutputKeyClass(FlowBeanOne.class); job.setMapOutputValueClass(Text.class); // 告诉框架,我们的mapperreducer输出的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBeanOne.class); // 告诉框架,我们要处理的文件在哪个路径下 FileInputFormat.setInputPaths(job, new Path("d:/intsmaze/input/")); // 告诉框架,我们的处理结果要输出到哪里去 FileOutputFormat.setOutputPath(job, new Path("d:/intsmaze/output/")); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }

  这里要注意,因为是汇总排序,所以reduce的并行度必须为1,。除了使用框架的组件外,我们还可以通过使用reduce的cleanup方法,自己在reduce端对收集到的数据进行汇总排序。

输出的结果确实是我们想要的结果: 
    6    4    10    13888888805
    4    6    10    13888888804
    5    5    10    13888888802
    1    9    10    13888888801
    2    7    9    13888888803
    1    0    1    13888888806
但是观察我们在控制台打印的信息:
-------------------
6    4    10    13888888805
4    6    10    13888888804
5    5    10    13888888802
1    9    10    13888888801
-------------------
2    7    9    13888888803
-------------------
1    0    1    13888888806

灵异现象

  执行job代码后,我们发现reduce任务中的reduce()方法只被调用了三次,参数key只被传入了三次,但是观察发现,key在一次reduce方法的调用中值是不断变化的,这有是怎么回事?
  我们重写的reduce方法如下:看参数确实是传入一个key以及key对应的value的迭代器集合,其实这个方法的参数只是一个误导,key值会随着value的迭代而不断的变化。reduce端的reduce方法接到map传来的数据并不是我们根据参数类型而认为的<hello,{1,1,1,1,1,1.....}>而是<hello,1>,<hello,1>,<hello,1>,<hello,1>......。
 protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }

来看看hadoop2.6.4源码解析吧:

因为这个问题是一年前遇到的,看完源码搞明白后,并没有时间去整理,所以再次解析有所不足。

Reducer源码解析

public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  public abstract class Context 
    implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  }

  /**
   * 这个方法我们不需要管,因为我们实现的类重写了该方法。
   */
  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }

  //通过debug我们可以看到,数据在结束map任务执行reduce任务的时候,reduce端会先调用这个方法,而调用这个
  //方法的类是我们实现的reduce类,通过继承调用该方法,然后在该方法里面调用我们实现类重写的reduce方法。
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKey()) {//这个地方调用ReduceContextImpl的方法进行判断
        reduce(context.getCurrentKey(), context.getValues(), context);//这个地方调用我们的实现类的reduce方法走我们的逻辑代码了
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }
    } finally {
      cleanup(context);
    }
  }
}

ReduceContextImpl源码解析

(由于代码太多,我只截取了部分主要的代码)

public class ReduceContextImpl {
  private RawKeyValueIterator input;//这个迭代器里面存储的key-value对元素。
  private KEYIN key;                                  // current key
  private VALUEIN value;                              // current value
  private boolean firstValue = false;                 // first value in key
  private boolean nextKeyIsSame = false;              // more w/ this key
  private boolean hasMore;                            // more in file
  private ValueIterable iterable = new ValueIterable();//访问自己的内部类 
  
  public ReduceContextImpl() throws InterruptedException, IOException{
    hasMore = input.next();//对象创建的时候,就先判断reduce接收的key-value迭代器是否有元素,并获取下一个元素
  }
  /** 创建完成就调用该方法 ,开始处理下一个唯一的key*/
  public boolean nextKey() throws IOException,InterruptedException {
    while (hasMore && nextKeyIsSame) {
    //判断迭代器是否还有下一个元素已经下一个元素是否和上一个已经遍历出来的key-value元素的key是不是一样
      nextKeyValue();
    }
    if (hasMore) {
      if (inputKeyCounter != null) {
        inputKeyCounter.increment(1);
      }
      return nextKeyValue();
    } else {
      return false;
    }
  }
  /**
   * Advance to the next key/value pair.
   */
  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
    if (!hasMore) {
      key = null;
      value = null;
      return false;
    }
    firstValue = !nextKeyIsSame;
    
    //获取迭代器下一个元素的key
    DataInputBuffer nextKey = input.getKey();
    //设置当前key的坐标
    currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
                      nextKey.getLength() - nextKey.getPosition());
    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
    
    //反序列化得到当前key对象
    key = keyDeserializer.deserialize(key);
    //获取迭代器下一个元素的value
    DataInputBuffer nextVal = input.getValue();
    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
        - nextVal.getPosition());
        
    //反序列化value
    value = valueDeserializer.deserialize(value);
    currentKeyLength = nextKey.getLength() - nextKey.getPosition();
    currentValueLength = nextVal.getLength() - nextVal.getPosition();
    if (isMarked) {
        //存储下一个key和value
      backupStore.write(nextKey, nextVal);
    }
    
    //迭代器向下迭代一次
    hasMore = input.next();
    //如果还有元素,则进行比较,判断key是否相同
    if (hasMore) {
      nextKey = input.getKey();
      //这个地方也是比较关键的:
      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
                                     currentRawKey.getLength(),
                                     nextKey.getData(),
                                     nextKey.getPosition(),
                                     nextKey.getLength() - nextKey.getPosition()
                                         ) == 0;
    } else {
      nextKeyIsSame = false;
    }
    
    inputValueCounter.increment(1);
    return true;
  }
  
  //一个迭代器模式的内部类
  protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {
    private boolean inReset = false;
    private boolean clearMarkFlag = false;
    @Override//它并不仅仅是判断迭代器是否还有下一个元素,而且还要判断下一个元素和上一个元素是不是相同的key
    public boolean hasNext() {
        if (inReset && backupStore.hasNext()) {
          return true;
        } 
      return firstValue || nextKeyIsSame;
    }
    @Override
    //这个地方要注意了,其实在获取下一个元素的时候主要调用的是nextKeyValue();
    public VALUEIN next() {
      if (inReset) {
          if (backupStore.hasNext()) {
            backupStore.next();
            DataInputBuffer next = backupStore.nextValue();
            buffer.reset(next.getData(), next.getPosition(), next.getLength()
                - next.getPosition());
            value = valueDeserializer.deserialize(value);
            return value;
          } else {
            inReset = false;
            backupStore.exitResetMode();
            if (clearMarkFlag) {
              clearMarkFlag = false;
              isMarked = false;
            }
          }
      } 
      // if this is the first record, we don't need to advance
      if (firstValue) {
        firstValue = false;
        return value;
      }
      // otherwise, go to the next key/value pair
    nextKeyValue();//该方法就是获取下一个key,value对,key值的变化也就在这里表现出来了。
    return value;
    }
  }
  
  //内部类,实现迭代器,具备迭代器功能
  protected class ValueIterable implements Iterable<VALUEIN> {
    private ValueIterator iterator = new ValueIterator();
    @Override
    public Iterator<VALUEIN> iterator() {
      return iterator;
    } 
  }
  public  Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
    return iterable;
  }
}

  简单一句话总结就是:ReduceContextImpl类的RawKeyValueIterator input迭代器对象里面存储中着key-value对的元素, 以及一个只存储value的迭代器,然后每调一次我们实现的reduce方法,就是传入ValueIterable迭代器对象和当前的key。但是我们在方法里面调用迭代器的next方法时,其实调用了nextKeyValue,来获取下一个key和value,并判断下一个key是否和 上一个key是否相同,然后决定hashNext方法是否结束,同时对key进行了一次重新赋值。

  这个方法获取KV的迭代器的下一个KV值,然后把K值和V值放到之前传入我们自己写的Reduce类的方法中哪个输入参数的地址上,白话说:框架调用我们写的reduce方法时,传入了三个参数,然后我们方法内部调用phoneNbrs.hashNext方法就是调用的ReduceContextImpl的内部类ValueIterator的hashNext方法,这个方法里面调用了ReduceContextImpl内的nextKeyValue方法,该方法内部又清除了之前调用用户自定义reduce方法时传入的k,v参数的内存地址的数据,然后获取了RawKeyValueIterator input迭代器的下一个KV值,然后把k值和V值放入该数据。这就是原因了。

 再看我们的reduce实现类

    public static class FlowSumSortReducerOne extends Reducer<FlowBeanOne, Text, Text, FlowBeanOne> {
        
        @Override
        protected void reduce(FlowBeanOne bean, Iterable<Text> phoneNbrs, Context context) throws IOException, InterruptedException {
            System.out.println("-------------------");
            for (Text text : phoneNbrs) {//这里就是迭代器,相当于调用ValueIterable.hashNext
                System.out.println(bean);
                context.write(text, bean);
            }
        }
    }

   最近实在是不知道学点什么了呦,就把hadoop回顾一下,当初学时,为了快速上手,都是记各种理论以及结论,没有时间去看源码验证,也不知道人家说的结论是否正确,这次回滚就是看源码验证当初结论的正确性。这也快一年没有用了,最近一直从事分布式实时计算的研究。

                           

作者: intsmaze(刘洋)
老铁,你的--->推荐,--->关注,--->评论--->是我继续写作的动力。
微信公众号号:Apache技术研究院
由于博主能力有限,文中可能存在描述不正确,欢迎指正、补充!
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
相关文章
|
6天前
|
监控 安全 网络安全
深入解析PDCERF:网络安全应急响应的六阶段方法
PDCERF是网络安全应急响应的六阶段方法,涵盖准备、检测、抑制、根除、恢复和跟进。本文详细解析各阶段目标与操作步骤,并附图例,助读者理解与应用,提升组织应对安全事件的能力。
133 89
|
21天前
|
数据可视化 项目管理
个人和团队都好用的年度复盘工具:看板与KPT方法解析
本文带你了解高效方法KPT复盘法(Keep、Problem、Try),结合看板工具,帮助你理清头绪,快速完成年度复盘。
82 7
个人和团队都好用的年度复盘工具:看板与KPT方法解析
|
5天前
|
人工智能 监控 数据可视化
提升开发效率:看板方法的全面解析
随着软件开发复杂度提升,并行开发模式下面临资源分配不均、信息传递延迟及缺乏全局视图等瓶颈问题。看板工具通过任务状态实时可视化、流量效率监控和任务依赖管理,帮助团队直观展示和解决这些瓶颈。未来,结合AI预测和自动化优化,看板工具将更高效地支持并行开发,成为驱动协作与创新的核心支柱。
|
28天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
28天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
28天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
4天前
|
自然语言处理 数据处理 索引
mindspeed-llm源码解析(一)preprocess_data
mindspeed-llm是昇腾模型套件代码仓,原来叫"modelLink"。这篇文章带大家阅读一下数据处理脚本preprocess_data.py(基于1.0.0分支),数据处理是模型训练的第一步,经常会用到。
16 0
|
6天前
|
人工智能 供应链 搜索推荐
中国CRM市场深度分析:主流供应商排名与特点解析
随着中国企业数字化转型的深入,CRM(客户关系管理)软件市场迅速发展,形成了多个优秀解决方案提供商。销售易、纷享销客、明源云客、金蝶云之家、简道云、红圈营销和爱客CRM等供应商各具特色。销售易在大型企业市场表现突出,提供全链路营销销售一体化及强大的AI能力;纷享销客以易用性和高性价比著称,适合中小企业;明源云客专注房地产行业,提供全流程解决方案;金蝶云之家与ERP系统深度整合,适合传统制造业;简道云是低代码平台,灵活性高;红圈营销专注零售业,支持全渠道营销;爱客CRM则主打智能营销功能。企业在选择CRM时需综合考虑实施难度、价格定位、技术支持等因素,并结合自身需求进行试用和调研,确保选择最适合
|
3月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
145 3
|
7月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
74 1

热门文章

最新文章

推荐镜像

更多