天气案例:
需求:
找出每个月温度最高的两天
数据集:
1949-10-01 14:21:02 34c
1949-10-01 19:21:02 38c
1949-10-02 14:01:02 36c
1950-01-01 11:21:02 32c
1950-10-01 12:21:02 37c
1951-12-01 12:21:02 23c
1950-10-02 12:21:02 41c
1950-10-03 12:21:02 27c
1951-07-01 12:21:02 45c
1951-07-02 12:21:02 46c
案例分析:
在MR中,原语是“相同”key的键值对为一组,调用一次reduce方法,方法内迭代这组数据计算。
找出每个月气温最高的两天
分组:
年-组
reduce:年 手动找出每个月的气温数据(创建12个list集合/数组)
日-组
月–组
reduce:一个月的所有气温
如何按月分组?
1951-07-03 12:21:03 47c
分组比较器,按key分组
自定义分组比较器:
class MyGroupingComparator extends WritableComparator { int compare(WritableComparable a, WritableComparable b) { a和b表示两个key进行比较,key要包含月份和年份 return 0 1 -1; } } nextKeyIsSame boolean
排序:
要求在reduce端的一组数据中按温度倒序排序
同年同月的一组
reduce排序,还是map排序?
map端排序,按照温度倒序排序
实现了环形缓冲区
有一个排序溢写的方法:sortAndSpill
该方法如何进行排序的?
注意:sorter.sort,此处默认使用快排进行排序
默认情况下,sorter就是快排:QuickSort。
s也就是MapOutputBuffer.this。该对象有一个compare方法,因为上图中有一个s.compare方法
看一下MapOutputBuffer如何实现的compare方法:
该compare方法首先按照分区号排序,相同分区号的按照key的字典序排序。
而按照key进行排序的时候,使用的是comparator的compare方法
comparator是谁?
上图中的方法返回值就是该比较器
上述方法返回的是什么比较器?
ctrl+alt+b
getOutputKeyComparator方法要么返回我们自定义的,要么返回WritableComparator的get方法返回的比较器。
自己没有设置过,所以肯定是WwritableComparator的get返回值。
如果自己设置,job.setSortComparatorClass(MySortComparator.class)
如何实现的set?
设置的时候用的是setOutputKeyComparatorClass,使用的时候用getOutputKeyComparator方法
上图中,假如用户自定义的key,则要求该key提供排序比较器。
该比较器如何提供?
WritableComparator类的get方法用到了HashMap:comparators,该map的key是MR map输出key的类型:Text.class,value是MR的map输出key类型的比较器对象。comparators中的元素是如何放进去的?何时放进去的?
比如说Text.class
Text类的静态块负责将Text.class作为key,将Text自己提供的比较器对象作为value调用了一次WritableComparator的define方法。
define做了什么?
如果自己提供一个key,如何实现?
package com.bjsxt.mr.weather; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class Weather implements WritableComparable<Weather> { private Integer year; private Integer month; private Integer day; private Integer temperature; public Integer getYear() { return year; } public void setYear(Integer year) { this.year = year; } public Integer getMonth() { return month; } public void setMonth(Integer month) { this.month = month; } public Integer getDay() { return day; } public void setDay(Integer day) { this.day = day; } public Integer getTemperature() { return temperature; } public void setTemperature(Integer temperature) { this.temperature = temperature; } @Override public int compareTo(Weather o) { return 0; } @Override public void write(DataOutput out) throws IOException { } @Override public void readFields(DataInput in) throws IOException { } static class Comparator extends WritableComparator { public Comparator() { super(Weather.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { Weather wa = (Weather) a; Weather wb = (Weather) b; int result = wa.getYear().compareTo(wb.getYear()); if (result == 0) { result = wa.getMonth().compareTo(wb.getMonth()); if (result == 0) { // 同年同月的数据,按照温度倒序 // result = wa.getTemperature().compareTo(wb.getTemperature()); result = wb.getTemperature().compareTo(wa.getTemperature()); } } return result; } } static { WritableComparator.define(Weather.class, new Comparator()); } }
如果第一次没有获取到当前MR的MapOutputKey的比较器,则重新强制执行初始化静态块内容,如果还获取不到,则直接返回WritableComparator对象。
返回的WritableComparator对象本身给getOutputKeyComparator方法返回了。
comparator就是返回的这个WritableComparator对象。
此处的compare方法调用的是哪个?就是WritableComparator的compare方法
该方法最终要调用compare(key1,key2)方法,也就是:
该方法又调用了WritableComparable的compareTo方法,也就是:
该方法做什么用?应该返回什么值?
第二种方式:
package com.bjsxt.mr.weather; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class Weather2 implements WritableComparable<Weather2> { private Integer year; private Integer month; private Integer day; private Integer temperature; public Integer getYear() { return year; } public void setYear(Integer year) { this.year = year; } public Integer getMonth() { return month; } public void setMonth(Integer month) { this.month = month; } public Integer getDay() { return day; } public void setDay(Integer day) { this.day = day; } public Integer getTemperature() { return temperature; } public void setTemperature(Integer temperature) { this.temperature = temperature; } @Override public int compareTo(Weather2 that) { int result = this.getYear().compareTo(that.getYear()); if (result == 0) { result = this.getMonth().compareTo(that.getMonth()); if (result == 0) { // 温度倒序 result = that.getTemperature().compareTo(this.getTemperature()); } } return result; } @Override public void write(DataOutput out) throws IOException { // 序列化输出 out.writeInt(year); out.writeInt(month); out.writeInt(day); out.writeInt(temperature); } @Override public void readFields(DataInput in) throws IOException { //反序列化 setYear(in.readInt()); setMonth(in.readInt()); setDay(in.readInt()); setTemperature(in.readInt()); } }
map端排序比较器:SortComparator
class MySortComparator extends WritableComparator { int compare(WritableComparable a, WritableComparable b) { 按照温度倒序排序,但是只有同年同月的数据按照温度倒排才有意义。 按key排序,key需要包含年,月,温度 return 0 1 -1; } }
key的设计
map端输出key,包含年/月/温度
如何设计key?
是否可以自定义key?
class MyKey implements WritableComparable<MyKey> { private Integer year; private Integer month; private Integer day; private Integer wenDu; // 会被排序比较器覆盖 int compareTo(MyKey other) { this.wendu.compareTo(other.wendu) this.year.compareTo(other.year) this.month.compareTo(other.month) this.day.compareTo(other.day) return 0 1 -1 } } MyMapper extends Mapper<LongWritable, Text, MyKey, Text> { map{ MyKey mk = new MyKey(); mk.setday mk.setyear mk.setmonth mk.setwendu context.write(MyKey.obj, value); } }
分区器
如何分区?
- 分区保证同组数据在一起
- reduce端负载均衡,数据倾斜
自定义分区器
class MyPartitioner extends Partitioner<MyKey, Text> { int getPartition(key, value, reduceNum) { return 1 2 3 4 0; } }
提示一
1,MR
*保证原语
怎样划分数据,怎样定义一组
2,k:v映射的设计
考虑reduce的计算复杂度
3,能不能多个reduce
倾斜:抽样
集群资源情况
4,自定义数据类型
提示二
记录特点
每年
每个月
最高
2天
1天多条记录?
进一步思考
年月分组
温度升序
key中要包含时间和温度!
MR原语:相同的key分到一组
通过GroupCompartor设置分组规则
步骤
自定义数据类型Weather
包含时间
包含温度
自定义排序比较规则
自定义分组比较
年月相同被视为相同的key
那么reduce迭代时,相同年月的记录有可能是同一天的,reduce中需要判断是否同一天
注意OOM
数据量很大
全量数据可以切分成最少按一个月份的数据量进行判断
这种业务场景可以设置多个reduce
通过实现partition