天气案例:

需求:

找出每个月温度最高的两天

数据集:

1949-10-01 14:21:02 34c

1949-10-01 19:21:02 38c

1949-10-02 14:01:02 36c

1950-01-01 11:21:02 32c

1950-10-01 12:21:02 37c

1951-12-01 12:21:02 23c

1950-10-02 12:21:02 41c

1950-10-03 12:21:02 27c

1951-07-01 12:21:02 45c

1951-07-02 12:21:02 46c

 案例分析:

在MR中,原语是“相同”key的键值对为一组,调用一次reduce方法,方法内迭代这组数据计算。

找出每个月气温最高的两天

分组:

年-组

reduce:年  手动找出每个月的气温数据(创建12个list集合/数组)

日-组

reduce:一个月的所有气温

如何按月分组?

1951-07-03 12:21:03 47c

分组比较器,按key分组

自定义分组比较器:

class MyGroupingComparator  extends  WritableComparator {      int compare(WritableComparable a, WritableComparable b) {           a和b表示两个key进行比较,key要包含月份和年份           return 0 1 -1;      }  }   nextKeyIsSame boolean

排序:

要求在reduce端的一组数据中按温度倒序排序

同年同月的一组

reduce排序,还是map排序?

map端排序,按照温度倒序排序

MapReduce案例-1_30岁老阿姨的博客

实现了环形缓冲区

有一个排序溢写的方法:sortAndSpill

MapReduce案例-1_30岁老阿姨的博客

该方法如何进行排序的?

MapReduce案例-1_30岁老阿姨的博客

注意:sorter.sort,此处默认使用快排进行排序

MapReduce案例-1_30岁老阿姨的博客

默认情况下,sorter就是快排:QuickSort。

MapReduce案例-1_30岁老阿姨的博客

s也就是MapOutputBuffer.this。该对象有一个compare方法,因为上图中有一个s.compare方法

看一下MapOutputBuffer如何实现的compare方法:

MapReduce案例-1_30岁老阿姨的博客

该compare方法首先按照分区号排序,相同分区号的按照key的字典序排序。

而按照key进行排序的时候,使用的是comparator的compare方法

MapReduce案例-1_30岁老阿姨的博客

comparator是谁?

MapReduce案例-1_30岁老阿姨的博客

上图中的方法返回值就是该比较器

上述方法返回的是什么比较器?

ctrl+alt+b

MapReduce案例-1_30岁老阿姨的博客

getOutputKeyComparator方法要么返回我们自定义的,要么返回WritableComparator的get方法返回的比较器。

自己没有设置过,所以肯定是WwritableComparator的get返回值。

如果自己设置,job.setSortComparatorClass(MySortComparator.class)

MapReduce案例-1_30岁老阿姨的博客

如何实现的set?

MapReduce案例-1_30岁老阿姨的博客

MapReduce案例-1_30岁老阿姨的博客

设置的时候用的是setOutputKeyComparatorClass,使用的时候用getOutputKeyComparator方法

MapReduce案例-1_30岁老阿姨的博客

上图中,假如用户自定义的key,则要求该key提供排序比较器。

MapReduce案例-1_30岁老阿姨的博客

该比较器如何提供?

MapReduce案例-1_30岁老阿姨的博客

WritableComparator类的get方法用到了HashMap:comparators,该map的key是MR map输出key的类型:Text.class,value是MR的map输出key类型的比较器对象。comparators中的元素是如何放进去的?何时放进去的?

比如说Text.class

MapReduce案例-1_30岁老阿姨的博客

Text类的静态块负责将Text.class作为key,将Text自己提供的比较器对象作为value调用了一次WritableComparator的define方法。

define做了什么?

MapReduce案例-1_30岁老阿姨的博客

如果自己提供一个key,如何实现?

package com.bjsxt.mr.weather;  import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator;  import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;  public class Weather implements WritableComparable<Weather> {      private Integer year;     private Integer month;     private Integer day;     private Integer temperature;      public Integer getYear() {         return year;     }      public void setYear(Integer year) {         this.year = year;     }      public Integer getMonth() {         return month;     }      public void setMonth(Integer month) {         this.month = month;     }      public Integer getDay() {         return day;     }      public void setDay(Integer day) {         this.day = day;     }      public Integer getTemperature() {         return temperature;     }      public void setTemperature(Integer temperature) {         this.temperature = temperature;     }      @Override     public int compareTo(Weather o) {         return 0;     }      @Override     public void write(DataOutput out) throws IOException {      }      @Override     public void readFields(DataInput in) throws IOException {      }      static class Comparator extends WritableComparator {          public Comparator() {             super(Weather.class, true);         }          @Override         public int compare(WritableComparable a, WritableComparable b) {              Weather wa = (Weather) a;             Weather wb = (Weather) b;              int result = wa.getYear().compareTo(wb.getYear());             if (result == 0) {                 result = wa.getMonth().compareTo(wb.getMonth());                 if (result == 0) {                     // 同年同月的数据,按照温度倒序 //                    result = wa.getTemperature().compareTo(wb.getTemperature());                     result = wb.getTemperature().compareTo(wa.getTemperature());                 }             }              return result;         }     }      static {         WritableComparator.define(Weather.class, new Comparator());     }  } 

MapReduce案例-1_30岁老阿姨的博客

如果第一次没有获取到当前MR的MapOutputKey的比较器,则重新强制执行初始化静态块内容,如果还获取不到,则直接返回WritableComparator对象。

MapReduce案例-1_30岁老阿姨的博客

返回的WritableComparator对象本身给getOutputKeyComparator方法返回了。

MapReduce案例-1_30岁老阿姨的博客

comparator就是返回的这个WritableComparator对象。

MapReduce案例-1_30岁老阿姨的博客

此处的compare方法调用的是哪个?就是WritableComparator的compare方法

MapReduce案例-1_30岁老阿姨的博客

该方法最终要调用compare(key1,key2)方法,也就是:

MapReduce案例-1_30岁老阿姨的博客

该方法又调用了WritableComparable的compareTo方法,也就是:

MapReduce案例-1_30岁老阿姨的博客

该方法做什么用?应该返回什么值?

第二种方式:

package com.bjsxt.mr.weather;  import org.apache.hadoop.io.WritableComparable;  import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;  public class Weather2 implements WritableComparable<Weather2> {      private Integer year;     private Integer month;     private Integer day;     private Integer temperature;      public Integer getYear() {         return year;     }      public void setYear(Integer year) {         this.year = year;     }      public Integer getMonth() {         return month;     }      public void setMonth(Integer month) {         this.month = month;     }      public Integer getDay() {         return day;     }      public void setDay(Integer day) {         this.day = day;     }      public Integer getTemperature() {         return temperature;     }      public void setTemperature(Integer temperature) {         this.temperature = temperature;     }      @Override     public int compareTo(Weather2 that) {          int result = this.getYear().compareTo(that.getYear());         if (result == 0) {             result = this.getMonth().compareTo(that.getMonth());             if (result == 0) {                 // 温度倒序                 result = that.getTemperature().compareTo(this.getTemperature());             }         }         return result;     }      @Override     public void write(DataOutput out) throws IOException {         // 序列化输出         out.writeInt(year);         out.writeInt(month);         out.writeInt(day);         out.writeInt(temperature);     }      @Override     public void readFields(DataInput in) throws IOException {         //反序列化         setYear(in.readInt());         setMonth(in.readInt());         setDay(in.readInt());         setTemperature(in.readInt());     } } 

map端排序比较器:SortComparator

class MySortComparator  extends  WritableComparator { int compare(WritableComparable a, WritableComparable b) {      按照温度倒序排序,但是只有同年同月的数据按照温度倒排才有意义。   按key排序,key需要包含年,月,温度      return 0 1 -1; } } 

key的设计

map端输出key,包含年/月/温度

如何设计key?

是否可以自定义key?

class MyKey implements WritableComparable<MyKey> {       private Integer year;  private Integer month;  private Integer day;  private Integer wenDu;    // 会被排序比较器覆盖 int compareTo(MyKey other) {     this.wendu.compareTo(other.wendu)  this.year.compareTo(other.year) this.month.compareTo(other.month) this.day.compareTo(other.day)      return 0 1 -1 }      }  MyMapper  extends  Mapper<LongWritable, Text, MyKey, Text> {  map{      MyKey mk = new MyKey();         mk.setday   mk.setyear   mk.setmonth   mk.setwendu      context.write(MyKey.obj, value); } } 

分区器

如何分区?

  1. 分区保证同组数据在一起
  2. reduce端负载均衡,数据倾斜

自定义分区器

class MyPartitioner  extends  Partitioner<MyKey, Text> {  int getPartition(key, value, reduceNum) {   return 1 2 3 4 0; } } 

提示一

MapReduce案例-1_30岁老阿姨的博客

1,MR

        *保证原语

        怎样划分数据,怎样定义一组

2,k:v映射的设计

        考虑reduce的计算复杂度

3,能不能多个reduce

        倾斜:抽样

        集群资源情况

4,自定义数据类型

提示二

记录特点

        每年

        每个月

        最高

        2天

        1天多条记录?

进一步思考

        年月分组

        温度升序

        key中要包含时间和温度!

MR原语:相同的key分到一组

        通过GroupCompartor设置分组规则

步骤

自定义数据类型Weather

        包含时间

        包含温度

        自定义排序比较规则

自定义分组比较

        年月相同被视为相同的key

        那么reduce迭代时,相同年月的记录有可能是同一天的,reduce中需要判断是否同一天

        注意OOM

数据量很大

        全量数据可以切分成最少按一个月份的数据量进行判断

        这种业务场景可以设置多个reduce

        通过实现partition