zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

hadoop中分布式计算怎么实现,hadoop四大框架

2023-03-07 09:47:17 时间

第一条。介绍1.1 InputFormat选择读取数据源的方式1.2 OutputFormat选择输出数据的方式2、InputFormat输入2.1切片和map之间的并行性确定机制Task 2.2.1数据块和切片2.2.1.1块2.2.1.2切片2.2结论2.2作业提交过程源码和切片源码详细说明2.3 TextInputFormat2.3.1概述2.3.2示例2.4 CombineTextInputFormat切片机制2.4.1问题解决2.4.2应用MapReduce工作流3.1客户端3.2 yarn 3.3 map task 3.4 shuffle 3.5 maapp master 3.6 reduce task 3.7输出格式4, 混洗机制4.1 shuffle4.2分区分区4.2.1默认哈希分区4.2.1.1分区号2哈希分区4.2.1.2使用默认分区规则分区号1 4.2.2自定义分区步骤4.3分区案例实践4.3.1需求4.3.2增加点数面积4.3.3结果4.3.4 MapReduce分区号和减少数4.4可写可比排序4.4.1概念4.4.2分类4.5可写可比排序案例实践全排序4.5.1需求4.5.2需求分析4 代码编写4.6.3结果4.7可写可比排序案例练习区间排序4.7.1要求4.7.2代码编写4.7.3结果4.8合并器合并4.8.1位置4.8.2条件4.8 .3操作4.8.4好处4.8.5描述4.9合并器合并案例练习4.9.1模式14.9.2模式24.9.3结果4.9.4实现合并器的例程5, OutputFormat数据输出5.1 OutputFormat接口实现类5.1.1位置5.1.2自定义OutputFormat 5.2自定义OutputFormat案例练习5.2.1需求分析5.2.2代码编写5.2.3最终结果5.3 MapTask工作机制5.3.1图5.3.2 5阶段5.4减少任务的工作机制5.4.1图5.4.2三阶段5.5并行性决策机制5.6 Map Task减少任务源码分析6 Join应用6.1 Reduce Join6.1.1需求分析6.1.2问题6.2 Reduce Join案例练习6.3 Map Join6.3.1需求分析6.4 Map Join案例练习7、数据清洗ETL7.1概念7.2需求分析7.3代码编写7.4结果8、MapReduce核心框架原理总结

1.介绍

1.1 InputFormat选择读取数据源的方式。默认情况下,key是逐行读取的,offset值是一行数据。您可以选择InputFormat的实现类FileInputFormat和TextInputFormat来更改读取数据的方式。

1.2 OutputFormat选择输出数据的方式默认是输出到一个文件,key是offset value是一行数据,可以选择OutputFormat的实现类DBOutputFormaat将数据写入数据库hbase和es。

2.输入格式数据输入2.1切片和映射之间的并行度任务并行度确定机制映射任务映射任务数

许多MapTask并行处理同一批数据。

并行太少,速度慢,并行太多。拆分任务的时间可能大于处理速度的时间,浪费资源。2.2.1数据块和切片2.2.1.1数据块将数据物理地切割成文件块,这些文件块是hdfs的存储单元。

示例300M文件块在hdfs中被分成三个块,只有一个副本。

数据块存储大小datanode1存储0-128Mdatanode2存储128-256Mdatanode3存储256-300M2.2.1.2切片。从逻辑上讲,对输入数据进行切片并不会对其进行切片。存储是MapReduce程序计算输入数据的单位。一个切片将启动一个MapReduce。

例子

如果切片大小为100M

切片数据块读取模式数据块服务器0-100M in

一台服务器上读取datanode1100-200M在两台服务器上读取需要跨节点通讯合并文件datanode1和datanode2200-300M在两台服务器上读取需要跨节点通讯合并文件datanode2和datanode3

如果切片大小为128M

切片数据块读取方式数据块服务器0-128M在一台服务器上读取datanode1128-256M在一台服务器上读取datanode2256-300M在一台服务器上读取datanode3

在本地处理数据速度最快节点距离最近。所以在Hadoop底层切片大小默认等于数据块大小。

2.2.2 结论 一个Job的Map阶段由提交Job时的切片数决定每一个切片分配一个MapTask默认情况切片大小块大小128M 后续会讲如何修改 切片时不考虑数据整体而是对每一个文件单独切片每个文件不管是大是小都是按照块大小单独切片 2.2 Job提交流程源码和切片源码详解

Job提交流程源码详解 链接: 大数据—Hadoop十一_ MapReduce_04、核心框架原理_源码1_ Job提交流程

Job提交流程源码总结

建立连接 判断是本地环境还是yarn环境提交代码 创建stag路径用于临时缓存文件 创建JobID把这个ID放在路径下面 如果是集群模式会把jar拷贝给集群 开始切片生产切片文件.split 提交.xml文件 提交作业状态从DEFINE变成RUNNING

切片机制源码总结

获取本地数据的存储目录依次遍历目录下的文件 对每个文件单独切片遍历第一个文件 获取文件大小 计算切片大小long splitSize computeSplitSize(blockSize, minSize, maxSize); 逻辑blockSize本地32M集群128M, minSize1, maxSizeLong的最大值三个数据的中间值默认情况下块大小 切片大小 如果块大小想改大调整minSize 如果块大小想改小调整maxSize 生产环境一般只会调大不调小开始切片数据每切一片都要判断剩余部分是否大于块大小*1.1不大于按照一块计算 文件大小文件切几块切几片128.将切片信息写入文件整个核心过程在getSplit()中完成最终InputSplit只记录了切片的元数据信息比如起始位置、长度不会真正对数据做改变将切片规划提交到yarn上MrAppMaster会根据切片数开启对应的MapTask个数 2.3 TextInputFormat 2.3.1 概述

TextInputFormat是Hadoop默认的InputFormat是FileInputFormat的实现类

FileInputFormat常见的接口实现类包括TextInputFormat、KeyValueTextInputFormat、NLineInputFormat一次读取多行、CombineTextInputFormat一次读取多个文件处理小文件问题和自定义InputFormat等。

键是存储该行在整个文件中的起始字节偏移量 LongWritable类型 值是这行的内容不包括任何行终止符换行符和回车符 Text类型

2.3.2 示例 输入数据Rich learning formIntelligent learning engineLearning more convenientFrom the real demand for more close to the enterprise 输出数据key,value(0,Rich learning form)(20,Intelligent learning engine)(49,Learning more convenient)(74,From the real demand for more close to the enterprise)2.4 CombineTextInputFormat切片机制 2.4.1 解决问题

默认的TextInputformat是针对文件规划切片不管文件多小都会成为一个个单独的切片开启一个MapTask。 如果小文件较多则会有多个MapTask效率较低

2.4.2 应用场景

将多个小文件从逻辑上划分到一个切片中交给1个或者多个MapTask处理数据

控制切片大小

修改虚拟存储切片最大值CombineTextInputFormat.setMaxInputSplitSize(job,); // / 1024 / 1024 4m指最终被MapTask读取的文件不能小于4M

执行过程

a、对多个小文件按照字典序排序

文件名文件大小a.txt1.7Mb.txt6Mc.txt3Md.txt7M

b、小文件超过4M则进行拆分

文件名文件大小a.txt1.7Mb.txt6Mb1.txt3Mb2.txt3Mc.txt3Md.txt7Md1.txt3.5Md2.txt3.5M

c、两两相加超过4M立即合并

切片1

文件名文件大小a.txt1.7Mb1.txt3M

切片2

文件名文件大小b2.txt3Mc.txt3M

切片3

文件名文件大小d1.txt3.5Md2.txt3.5M2.5 CombineTextInputFormat案例实操

读取四个小文件执行WordCount。如果不做任何处理默认是有4个MapTask1个ReduceTask

日志 number of splits:4 4个MapTask1个ReduceTask

2.5.1 实现1 // 如果不设置InputFormat它默认用的是TextInputFormat.classjob.setInputFormatClass(CombineTextInputFormat.class);//虚拟存储切片最大值设置4mCombineTextInputFormat.setMaxInputSplitSize(job,);

日志 number of splits:3 3个MapTask1个ReduceTask

2.5.2 实现2 // 如果不设置InputFormat它默认用的是TextInputFormat.classjob.setInputFormatClass(CombineTextInputFormat.class);//虚拟存储切片最大值设置4mCombineTextInputFormat.setMaxInputSplitSize(job,);

日志 number of splits:1 1个MapTask1个ReduceTask

3、MapReduce工作流程

图解

3.1 客户端

有一个待处理的文件 200M

读取文件后对数据进行切片分析 第一个切片0-128M 第二个切片128-200M

提交信息 本地模式 — job.split 切片信息 — job.xml 默认的配置信息 yarn模式 — job.split 切片信息 — job.xml 默认的配置信息 — wc.jar

3.2 Yarn yarn开启一个MrAppMaster整个任务运行的老大MrAppMaster 会去读取切片信息开启和切片个数对应的MapTask个数 3.3 MapTask 使用读取数据InputFormat子类TextInputFormat读取数据 k是偏移量v是一行内容读后的数据返回给Mapper经过业务逻辑处理 kv 根据业务逻辑定 3.4 Shuffle Mapper计算后的数据会经过getPartition() 确认数据存放在哪个分区中给数据打上分区标记输出到环形缓冲区 环形缓冲区内存 一半用于存储索引一半用于存储数据 左侧存索引keyindex索引partition数据的分区keystart数据key开始位置valstart数据value开始位置

左侧存索引

右侧存数据valuekeyMapper的输出数据k根据业务逻辑定valueMapper的输出数据v根据业务逻辑定unsued

默认大小是100M当数据达到80%后反向溢写

图解

a、往环形缓冲区填写数据左侧填充索引右侧填充数据

b、当达到80%后最取剩余空间20%的中间位反向填充数据

c、内存中的数据反向溢写

d、如果反向填充的速度大于溢写速度则等待填充

达到80%后先对数据按照分区快排排key的索引反向溢写落盘 分区——每个分区对应一个Reducer之后每个Reducer会拉取对应分区的数据

将多个分区的数据环形缓存区的80%溢写成1个文件 文件数——此时这个文件个数只有1个如果是2个分区则可能是左边为分区1右边为分区2

第9步的溢写可能会有多次则会有多个溢写文件。此时对多个溢写文件进行归并排序 归并排序——使用归并排序是因为每个溢写文件都是有序的则采用归并排序的方式最快 文件数——需要归并的文件 -a- 此时需要归并的文件一般为1-2个因为切片数默认为128M -b- 当环形缓冲区的数据达到 80M 开始溢写生成第一个溢写文件超过80M则会有第二个溢写文件 -c- 如果是不可切片的压缩数据则会有超过2个需要归并的文件 -d- 归并后会生成1个文件写入磁盘等待Reduce拉取数据

对归并后的文件合并Combiner合并是一个不能影响业务逻辑的可选优化手段另外还有一个可选的步骤压缩减少网络IO 将a,1,a,1合并成a,2

3.5 MaAppMaster 启动相应数量的ReduceTask并告知处理的数据分区 ReduceTask一般是在MapTask快结束后开启的如果是源数据较小则是在MapTask结束后开启 3.6 ReduceTask 主动从每个MapTask对应的分区拉取数据全局归并排序生成一个文件不会进行Combine 拉取文件 —— 拉取的时候会先放在内存中如果内存不够的情况下会落盘 归并排序 —— 注意要和Combine合并做区分 MapTask不Combine后生成文件示例 —— a,1a,1b,1c,1c,1 按照相同的key做分组可选—— 因为每个reduce()方法都会处理相同key的数据每个Reducer读取一组数据 3.7 OutputFormat 由 OutputFormat 的默认子类 TextOutputFormat 往外写数据形成最终输出文件 part-r-00000 …… 4、Shuffle机制 4.1 shuffle

在Map方法之后Reduce方法之前的过程叫做 shuffle 这一块会把这个过程详细讲一遍

4.2 Partition分区 4.2.1 默认的HashPartition分区

根据 key 的 hashcode 值 对 ReduceTask 个数取模 用户没法控制哪一个Key存在哪一个分区 例如求wordcount。最终生成2个文件HashPartition分区无法控制源数据a,1b,1c,1中 a和b是否在同一个文件

如果想完成a-p在一个分区q-z在另一个分区只能重写分区方法自定义分区

ReduceTask——指最终输出文件个数

代码——在Driver中设置分区个数 - 设置分区个数 job.setNumReduceTasks(2);

4.2.1.1 分区数>2hash分区

分区源码会产生多个分区文件

1、

Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1、获取一行数据 String line value.toString(); // 2、切割 String[] words line.split( ); // 3、循环写出 for (String word : words) { outKey.set(word); context.write(outKey, outValue); } }}

2、WrappedMapper

3、

Overridepublic void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException { mapContext.write(key, value);}

4、MapTask

5、

Overridepublic void write(K key, V value) throws IOException, InterruptedException { collector.collect(key, value, partitioner.getPartition(key, value, partitions));}

6、先执行参数里面的方法

partitioner.getPartition(key, value, partitions)

7、HashPartitioner

8、

public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> { public void configure(JobConf job) {}/** Use {link Object#hashCode()} to partition. */public int getPartition(K2 key, V2 value, int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }} 4.2.1.2 使用默认的分区规则分区数1

分区源码只会产生一个分区文件 源码部分和分区个数大于等于2中1-7步完全一致

1、

Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1、获取一行数据 String line value.toString(); // 2、切割 String[] words line.split( ); // 3、循环写出 for (String word : words) { outKey.set(word); context.write(outKey, outValue); } }}

2、WrappedMapper

3、

Overridepublic void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException { mapContext.write(key, value);}

4、MapTask

5、

Overridepublic void write(K key, V value) throws IOException, InterruptedException { collector.collect(key, value, partitioner.getPartition(key, value, partitions));}

6、先执行参数里面的方法

partitioner.getPartition(key, value, partitions)

7、HashPartitioner 大于1走hash分区否则只会生成一个分区0

8、

NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException { collector createSortingCollector(job, reporter); partitions jobContext.getNumReduceTasks(); if (partitions > 1) { partitioner (org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); } else { partitioner new org.apache.hadoop.mapreduce.Partitioner<K,V>() { Override public int getPartition(K key, V value, int numPartitions) { return partitions - 1; } }; }} 4.2.2 自定义分区步骤 自定义类继承Partitioner重写getPartition()在Job驱动中设置自定义Partitioner自定义Partition后要根据自定义Partitioner的逻辑设置相应数量的ReduceTask 4.3 Partition分区案例实操 4.3.1 需求

统计结果按照手机号归属地将不同省份的数据输出到相应文件中

输入数据

自增id手机号网络IP域名上行下行网络状态2.196.100.1www.atguigu.com.196.100..196.100..168.100..168.100.2www.atguigu.com.168.100.3www.atguigu.com.168.100..168.100.5www.hao123.com.168.100..168.100.7www.shouhu.com.168.100.8www.baidu.com.168.100.9www.atguigu.com.168.100..168.100..168.100.12www.qq.com.168.100.13www.gaga.com.168.100.14www.qinghua.com2.168.100.15www.sogou.com.168.100.16www.baidu.com2.168.100..168.100.18www.alibaba.com.168.100.

结果分区

手机号分区号136分区0137分区1138分区2139分区3其他分区44.3.2 增加分区

增加一个ProvincePartition分区

输出类型和map()的输出保持一致 分区在Map端将数据写入环形缓冲区之前继承Partition抽象类重写getPartition() 重写返回分区方法 按照条件返回分区5个分区号 0-4号Driver驱动类 指定自定义分区 // 指定自定义类 job.setPartitionerClass(ProvincePartitioner.class);指定自定义分区数量 // 指定5个分区 ReduceTask 的个数 job.setNumReduceTasks(5); 4.3.3 结果

4.3.4 MapReduce 分区数 与 Reduce 个数

修改Reduce个数

分区数ReduceTask个数结果是否报错现象指定4(2-4)个分区job.setNumReduceTasks(4);4非法分区无法正常运行5个分区无法正常被4个Reduce消费指定1个分区job.setNumReduceTasks(1);1只会生成1个文件正常运行走默认1个分区指定6(>5)个分区job.setNumReduceTasks(6);6生成6个文件正常运行会生成1个文件4.4 WritableComparable排序 4.4.1 概念

排序是MapReduce框架中最重要的操作之一 在MapTask和ReduceTask中均会按照key进行排序属于默认行为如果无法排序则会报错

MapTask 快排读取一个超过80M的文件后发生溢写产生的排序 归并对上方产生的多个溢写文件进行归并排序ReduceTask 归并对拉取对应分区内的数据进行一次归并排序默认排序是对key的索引按照字典排序实现方式是快排 4.4.2 分类 排序方式排序规则分区数说明例子部分排序对每个记录的键排序保证每个文件内部有序超过1个分区分区内排序例如对地区分区再对金额排倒序全排序最终只会输出1个文件且文件内部有序只有1个分区效率极低丧失MapReduce的并行能力例如整表order by SELECT * FROM TABLE ORDER BY XX二次排序自定义排序自定义重写compareTo方法自定义之前是将FlowBean放在value端不是在key端不需要重写排序方法如果需要放在key上进行传输则一定要重写排序方法例如先按照总流量排序在按照上行流量排序4.5 WritableComparable排序案例实操全排序 4.5.1 需求

对之前的输出结果按照手机总流量的倒序进行排序

手机号上下总55484.5.2 需求分析 Key 上、下、总流量组合成FlowBeanValue手机号 实现WritableComparable接口重写compareTo方法Mapper类 context.write(bean,手机号)Reduce类 循环输出避免总流量相同的情况 阶段keyvaluemap输入偏移量一行数据map输出FlowBeanNullWriablereduce输入TextNullWriablereduce输出TextNullWriable4.5.3 代码编写

Driver类

public class FlowDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1、获取配置信息获取Job对象实例 Configuration conf new Configuration(); Job job Job.getInstance(conf); // 2、指定Jar包所在的本地路径 通过全类名反射Jar包在什么位置 job.setJarByClass(FlowDriver.class); // 3、关联Mapper、Reducer job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReduce.class); // 4、指定Mapper输出数据的k,v类型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); // 5、指定最终输出数据的k,v类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 6、指定输入文件所在目录 FileInputFormat.setInputPaths(job,new Path(D:\\hadoop\\output27)); // 7、指定输出文件所在目录 FileOutputFormat.setOutputPath(job,new Path(D:\\hadoop\\output35)); // 8、提交作业 boolean result job.waitForCompletion(true); //成功返回0 System.exit(result?0:1); }}

FlowBean类

package com.atguigu.mapreduce.writableComparable;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/*** a、实现writable接口* b、反序列化时需要反射调用空参构造函数所以必须有空参构造* c、重写序列化方法* d、重写反序列化方法* e、注意反序列化的顺序和序列化的顺序完全一致* f、要想把结果显示在文件中需要重写toString()可用\t分开方便后续用。* 如果不重写默认打印的是地址值* 打印地址里的数据需要使用toString()* g、如果需要将自定义的bean放在key中传输则还需要实现Comparable接口* Map<key,val,key,val>其中第二个key需要实现Comparable接口* 因为shuffle阶段会对key进行排序 */public class FlowBean implements WritableComparable<FlowBean> {private long upFlow; // 上行流量private long dowmFlow; // 下行流量private long sumFlow; // 总流量// 空参构造public FlowBean() {}Overridepublic void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(dowmFlow); out.writeLong(sumFlow);}Overridepublic void readFields(DataInput in) throws IOException { upFlow in.readLong(); dowmFlow in.readLong(); sumFlow in.readLong();}public long getUpFlow() { return upFlow;}public void setUpFlow(long upFlow) { this.upFlow upFlow;}public long getDowmFlow() { return dowmFlow;}public void setDowmFlow(long dowmFlow) { this.dowmFlow dowmFlow;}public long getSumFlow() { return sumFlow;}public void setSumFlow(long sumFlow) { this.sumFlow sumFlow;}public void setSumFlow() { this.sumFlow this.dowmFlow this.upFlow;}Overridepublic String toString() { // 最终展示效果 return upFlow \t dowmFlow \t sumFlow;}Overridepublic int compareTo(FlowBean o) { // 总流量的倒序排序 if(this.sumFlow > o.sumFlow){ return -1; }else if(this.sumFlow < o.sumFlow){ return 1; }else { return 0; } }}

Mapper类

public class FlowMapper extends Mapper<LongWritable, Text,FlowBean,Text > {private FlowBean outK new FlowBean();private Text outV new Text();Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 // String line value.toString(); // 2 切割 // String[] word line.split(\t); // 3 抓取想要的数据 //2.196.100.1www.atguigu.com // 手机号 、 上行流量 、 下行流量 String phone word[1]; // 4 封装 outV.set(word[0]); outK.setUpFlow(Long.parseLong(word[1])); outK.setDowmFlow(Long.parseLong(word[2])); outK.setSumFlow(); // 5 写出 context.write(outK,outV); }}

Reduce类

public class FlowReduce extends Reducer<FlowBean,Text ,Text , FlowBean> {private FlowBean outV new FlowBean();Overrideprotected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for(Text value : values){ context.write(value,key); } }} 4.5.4 结果

只按照总流量倒序排

手机号上行流量下行流量总流量02404.6 WritableComparable排序案例实操二次排序 4.6.1 需求

对之前的输出结果按照手机总流量的倒序进行排序如果总流量相同则按照上行流量倒序

4.6.2 代码编写

FlowBean类

package com.atguigu.mapreduce.writableComparable2;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class FlowBean implements WritableComparable<FlowBean> {private long upFlow; // 上行流量private long dowmFlow; // 下行流量private long sumFlow; // 总流量// 空参构造public FlowBean() {}Overridepublic void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(dowmFlow); out.writeLong(sumFlow);}Overridepublic void readFields(DataInput in) throws IOException { upFlow in.readLong(); dowmFlow in.readLong(); sumFlow in.readLong();}public long getUpFlow() { return upFlow;}public void setUpFlow(long upFlow) { this.upFlow upFlow;}public long getDowmFlow() { return dowmFlow;}public void setDowmFlow(long dowmFlow) { this.dowmFlow dowmFlow;}public long getSumFlow() { return sumFlow;}public void setSumFlow(long sumFlow) { this.sumFlow sumFlow;}public void setSumFlow() { this.sumFlow this.dowmFlow this.upFlow;}Overridepublic String toString() { // 最终展示效果 return upFlow \t dowmFlow \t sumFlow;}Overridepublic int compareTo(FlowBean o) { // 总流量的倒序排序 if(this.sumFlow > o.sumFlow){ return -1; }else if(this.sumFlow < o.sumFlow){ return 1; }else { // 上行流量的正序排序 if(this.upFlow > o.upFlow){ return -1; }else if(this.upFlow < o.upFlow){ return 1; }else { return 0; } } }}

Driver类 照旧 Mapper类 照旧 Reduce类 照旧

4.6.3 结果

先按照总流量倒序排在按照上行流量正序排

手机号上行流量下行流量总流量02404.7 WritableComparable排序案例实操区间内排序 4.7.1 需求

对之前的输出结果输出到5个分区并按照手机总流量的倒序进行排序如果总流量相同则按照上行流量倒序。

4.7.2 代码编写

Partitioner类

public class ProvincePartitioner2 extends Partitioner<FlowBean, Text> {Overridepublic int getPartition(FlowBean flowBean, Text text, int numPartitions) { // text 是手机号 String phone text.toString(); // 取手机号前三位 String prePhone phone.substring(0, 3); // 一般把常量放在前面防止空指针 int partition; if(136.equals(prePhone)){ partition 0; }else if (137.equals(prePhone)){ partition 1; }else if (138.equals(prePhone)){ partition 2; }else if (139.equals(prePhone)){ partition 3; }else { partition 4; } return partition; }}

Driver类 增加

// 设置自定义分区器job.setPartitionerClass(ProvincePartitioner2.class);// 设置对应的ReduceTask的个数job.setNumReduceTasks(5);

FlowBean类 照旧 Mapper类 照旧 Reduce类 照旧

4.7.3 结果

生成5个分区文件每个文件按照手机总流量的倒序进行排序

part-r-00000

手机号上行流量下行流量总流量848

part-r-00001

手机号上行流量下行流量总流量

part-r-00002

手机号上行流量下行流量总流量0264

part-r-00003

手机号上行流量下行流量总流量00240

part-r-00004

手机号上行流量下行流量总流量.8 Combiner合并 4.8.1 位置 环形缓冲区溢写到磁盘对归并后准备溢写到磁盘的数据合并 4.8.2 条件

溢写文件很多之前归并的时候没有归并完

例如 一个不可切分的2400M的文件会生成30个溢写文件归并的时候默认归并10个文件则会归并成3个归并文件3个归并文件再Combiner合并1次

4.8.3 操作

将a,1,a,1合并成a,2一定不能影响业务

4.8.4 好处

对每一个MapTask的输出结果进行局部汇总以减少向Reduce端传输的数据量

4.8.5 说明 Combiner是MR程序中处于Mapper和Reducer之外一个组件 处于可选选项类似于插件Combiner父类是ReducerCombiner和Reducer的运行位置不一样Combiner运行在每一个MapTask上Reducer是全局所有Mapper的输出结果 不能影响业务逻辑例如求平均值 4.9 Combiner合并案例实操 4.9.1 方式1

在WordCount的基础上增加WordCountCombiner extends Reducer<Text , IntWritable , Text , IntWritable>

public class WordCountCombiner extends Reducer<Text , IntWritable , Text , IntWritable> {private IntWritable outV new IntWritable();Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum 0; for (IntWritable value : values) { sum value.get(); } outV.set(sum); context.write(key,outV); }}

Driver

job.setCombinerClass(WordCountCombiner.class); 4.9.2 方式2

Driver

// 指定需要使用Combiner以及用哪个类作为Combiner的逻辑job.setCombinerClass(WordCountReducer.class); 4.9.3 结果

Reduce shuffle bytes88 会发现比之前变少了

4.9.4 之后实现combiner的套路 判断是否影响业务逻辑直接在驱动中添加Reducer 5、OutputFormat数据输出 5.1 OutputFormat接口实现类 5.1.1 位置

Reducer方法之后并不是直接将数据写入文件中而是经过了一个OutputFormat OutputFormat中有一个核心方法RecordWriter来决定是按行写写成一个文件还是写入数据库默认是按行写TextOutputFormat

5.1.2 自定义OutputFormat 应用场景 将数据写入Mysql或者es中步骤 继承FileOutputFormat 改写RecordWrite具体改写输出数据的方法write() 5.2 自定义OutputFormat案例实操 5.2.1 需求分析

处理输入日志如果这个日志包含atguigu则输出到d:/atguigu.log否则输出到d:/other.log

输入数据http://www.baidu.comhttp://www.google.comhttp://cn.bing.comhttp://www.atguigu.comhttp://www.sohu.comhttp://www.sina.comhttp://www.sin2a.comhttp://www.sin2desa.comhttp://www.sindsafa.com

分析

Map阶段泛型<Text , NullWritable> 无需对数据进行聚合操作Reduce阶段泛型<Text , NullWritable> 无需对数据进行聚合操作自定义OutputFormat输出类 继承RecordWriter 创建两条输出流 atguiguOut流 otherOut流驱动类Driver中添加自定义OutputFormat //设置自定义的outputformatjob.setOutputFormatClass(LogOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(D:\\input));//虽然我们自定义了outputformat但是因为我们的outputformat继承自fileoutputformat//而fileoutputformat要输出一个_SUCCESS文件所以在这还得指定一个输出目录FileOutputFormat.setOutputPath(job, new Path(D:\\logoutput)); 阶段keyvaluemap输入偏移量一行数据map输出TextNullWriablereduce输入TextNullWriablereduce输出TextNullWriable5.2.2 代码编写

Mapper

public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 输入数据 // http://www.baidu.com // http://www.google.com context.write(value,NullWritable.get()); }}

Reducer

public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { LogRecordWriter lrw new LogRecordWriter(job); return lrw; }}

LogRecordWriter

public class LogRecordWriter extends RecordWriter<Text, NullWritable> {private FSDataOutputStream atguiguOut;private FSDataOutputStream otherOut;public LogRecordWriter(TaskAttemptContext job) { // 创建两条流 try { // 使用Job自带的配置信息而不是自己new一个 FileSystem fs FileSystem.get(job.getConfiguration()); atguiguOut fs.create(new Path(D:\\hadoop\\output57\\atguigu.log)); otherOut fs.create(new Path(D:\\hadoop\\output57\\other.log)); } catch (IOException e) { e.printStackTrace(); }}Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException { // 创建两条流 String log key.toString(); if(log.contains(atguigu)){ atguiguOut.writeBytes(log \n); }else { otherOut.writeBytes(log \n); }}Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException { // 关闭两条流 IOUtils.closeStream(atguiguOut); IOUtils.closeStream(otherOut); }}

LogOutputFormat

public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { LogRecordWriter lrw new LogRecordWriter(job); return lrw; }}

LogDriver

public class LogDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf new Configuration(); Job job Job.getInstance(conf); job.setJarByClass(LogDriver.class); job.setMapperClass(LogMapper.class); job.setReducerClass(LogReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //设置自定义的outputformat job.setOutputFormatClass(LogOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(D:\\input\\inputoutputformat)); //虽然我们自定义了outputformat但是因为我们的outputformat继承自fileoutputformat //而fileoutputformat要输出一个_SUCCESS文件所以在这还得指定一个输出目录 FileOutputFormat.setOutputPath(job, new Path(D:\\hadoop\\output57)); boolean b job.waitForCompletion(true); System.exit(b ? 0 : 1); }} 5.2.3 最终结果

文件图示

atguigu.log http://www.atguigu.com

other.log http://cn.bing.com http://www.baidu.com http://www.google.com http://www.sin2a.com http://www.sin2desa.com http://www.sina.com http://www.sindsafa.com http://www.sohu.com

5.3 MapTask工作机制 5.3.1 图示

5.3.2 5个阶段

Read阶段

含有带读取的源数据客户端对任务进行切片划分提交任务给yarnjob.split、wc.jar、job.xmlyarn会开启一个MRAppMaster

Map阶段

使用默认的TextInputFormat来读取数据数据会进入map()运行用户自己写的逻辑

Collect阶段

向环形缓冲区存储数据到达80%的时候快排

溢写阶段

反向溢写

Merge阶段

归并排序 5.4 ReduceTask工作机制 5.4.1 图示

5.4.2 3个阶段

Copy阶段

拉取自己指定分区的数据

Sort阶段

进行一次归并排序

Reduce阶段

进入到Reducer方法中后面使用OutputForamat输出到文件 5.5 ReduceTask 并行度决定机制

MapTask并行度由切片个数决定

切片个数由切片规则决定 默认块大小是否有Combine

手动设置

Job.setNumReduceTasks(4);

注意

ReduceTask 0

没有Reduce阶段 输出文件和Map个数一致

ReduceTask 1 默认值就是1只输出一个文件如果数据分布不均匀则可能会在Reduce端产生数据倾斜 后面会讲如何使用随机数打散解决某个Reduce数据量过多的情况 有些时候根据业务需求需要全局汇总则只会有一个ReduceTask如果分区个数不是1但是ReduceTask 1则不执行分区因为执行分区首先判断ReduceTask是否大于1 5.6 MapTask & ReduceTask源码解析 6、Join应用 6.1 Reduce Join 6.1.1 需求分析

将两张表去做关联

订单数据表 t_order

idpidamount

商品信息表 t_product

pidpname01小米02华为03格力

最终数据形式

idpnameamount1001小米11004小米41002华为21005华为51003格力31006格力66.1.2 问题 怎么判断读取进来的是哪张表 切片调用 getPath().getName() 来得知 怎么设置Map的输出key 因为Reduce会按照key拉取数据 使用关联字段pid做为key 怎么设置Map的输出value javabean中包含订单id 数量产品名称表名 MapTask默认会对产品id排序 产品idkey订单idvalue数量value产品名称value表名valueorderorder01小米pdorderorder02华为pdorderorder03格力pdReduceTask中的数据 使用tableBean作为keyvalue为NullWritable 输出时会使用toString() 订单idtoString产品名称toString数量toString1004小米41001小米11005华为51002华为21006格力61003格力3

a、会根据key产品id01、02、03分别发送到三个ReduceTask方法中去 b、然后创建两个集合一个用来存储订单数据表一个用来存储商品信息表 c、循环遍历订单数据表集合并且给产品名称赋值

阶段keyvaluemap输入偏移量一行数据map输出产品idTableBeanreduce输入产品idTableBeanreduce输出TableBeanNullWriable6.2 Reduce Join案例实操

TableBean

public class TableBean implements Writable {private String id; // 订单idprivate String pid; // 商品idprivate int amount; // 商品数量private String pname; // 产品数量private String flag; // 判断是订单表还是产品表// 空参构造public TableBean(){}public String getId() { return id;}public void setId(String id) { this.id id;}public String getPid() { return pid;}public void setPid(String pid) { this.pid pid;}public int getAmount() { return amount;}public void setAmount(int amount) { this.amount amount;}public String getPname() { return pname;}public void setPname(String pname) { this.pname pname;}public String getFlag() { return flag;}public void setFlag(String flag) { this.flag flag;}// 序列化方法Overridepublic void write(DataOutput out) throws IOException { out.writeUTF(id); // 可以通过ctrl p 看到方法传入的参数是String类型 out.writeUTF(pid); out.write(amount); out.writeUTF(pname); out.writeUTF(flag);}// 反序列化方法Overridepublic void readFields(DataInput in) throws IOException { this.id in.readUTF(); this.pid in.readUTF(); this.amount in.readInt(); this.pname in.readUTF(); this.flag in.readUTF();}// 未来要打印出来 所以要重写toStringOverridepublic String toString() { return id \t pname \t amount; }} 实现Writale接口设置5大属性空参构造get set重写序列化和反序列化方法按照想要的结果重写同String方法

Mapper

public class TableMapper extends Mapper<LongWritable, Text ,Text , TableBean > {private String fileName;private Text outK new Text();private TableBean outV new TableBean();// 初始化方法Overrideprotected void setup(Context context) throws IOException, InterruptedException { InputSplit split context.getInputSplit(); // 强制转换成子类 FileSplit FileSplit fileSplit (FileSplit) split; // 因为一个文件会开启一个MapTask所以选择在setup中获取文件名 // ctrl alt f 提升全局变量 fileName fileSplit.getPath().getName();}Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行数据 String line value.toString(); if(fileName.contains(order)){ // 处理的是订单表 String[] split line.split(\t); outK.set(split[1]); outV.setId(split[0]); // 切割完之后还是一个String类型的数组统计商品数量时需要进行格式转换把String转换成int outV.setAmount(Integer.parseInt(split[2])); // 取不到的话也要给一个默认值否则会报序列化错误 outV.setPname(); outV.setFlag(order); }else{ // 处理的是商品表 String[] split line.split(\t); //封装outK outK.set(split[0]); //封装outV outV.setId(); outV.setPid(split[0]); // 没有商品数量给0 outV.setAmount(0); outV.setPname(split[1]); outV.setFlag(pd); } // 写出数据 context.write(outK,outV); }} 获取文件名称 因为一个文件会开启一个MapTask所以选择在setup中获取文件名ctrl alt f 提升全局变量优化方式一个MapTask读取一次文件名map() 根据表名来判断是业务逻辑

Reducer

public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {// 因为输出的结果集里只需要包含 tableBean 里 重写的toString() 所以只需要key不需要valueOverrideprotected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException { // 01 1001 1 order // 01 1001 4 order // 01 小米 pd // 存储多个订单数组 以及 一个TableBean ArrayList<TableBean> orderBeans new ArrayList<>(); TableBean pdBean new TableBean(); // 循环遍历 for (TableBean value : values) { if(order.equals(value.getFlag())){ // 由于Hadoop修改了数组的源码使得这个数组和Java中的数组不一致 // 每次读取一条数据都需要先创建一个对象 // 然后将这个这条数据赋给这个对象 // 否则当有多个对象传给这个数组时后一条数据都会覆盖掉前一条数据使得这个数组永远只会有最后传入进来的数据 TableBean tmpTable new TableBean(); try { // 将A对象中的值赋值给B对象 // 第一个参数 目标地址 第二个参数 原始数据 BeanUtils.copyProperties(tmpTable,value); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } orderBeans.add(tmpTable); }else { // 商品表 try { BeanUtils.copyProperties(pdBean,value); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } // 循环遍历orderBeans赋值pdBean for (TableBean orderBean : orderBeans) { orderBean.setPname(pdBean.getPname()); //写出修改后的orderBean对象 context.write(orderBean,NullWritable.get()); } }}

Driver

public class TableDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job Job.getInstance(new Configuration()); job.setJarByClass(TableDriver.class); job.setMapperClass(TableMapper.class); job.setReducerClass(TableReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TableBean.class); job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable.class); // inputtable目录下有两个文件 FileInputFormat.setInputPaths(job, new Path(D:\\input\\inputtable)); FileOutputFormat.setOutputPath(job, new Path(D:\\hadoop\\output53)); boolean b job.waitForCompletion(true); System.exit(b ? 0 : 1); }}

如果在编写过程遇到任何报错记得使用debug来定位报错位置

结果

订单idtoString产品名称toString数量toString1004小米41001小米11005华为51002华为21006格力61003格力36.3 Map Join 6.3.1 需求分析

Reduce Join 存在的问题

由于合并的方式在Reduce端可能导致Reduce端处理数据的压力过大容易产生数据倾斜问题一般情况下运行任务时都是MapTask的数量多ReduceTask的数量少。并且每个MapTask只处理128M的数据量

解决思路

将两张表在Map端做关联但是由于两张表位于不同的文件中则会有不同MapTask去处理相应的数据 先将小表pd.txt缓存到内存写入缓存集合再将大表order.txt使用MapTask正常读取

具体代码

商品信息表 t_product 加载缓存采用DistributedCache

Driver

//缓存普通文件到Task运行节点存放商品表数据job.addCacheFile(new URI(file:///e:/cache/pd.txt));//如果是集群运行,需要设置HDFS路径job.addCacheFile(new URI(hdfs://hadoop102:8020/cache/pd.txt));

商品信息表 t_product

pidpname01小米02华为03格力订单数据表 t_order 正常读取

Driver

// 6 设置输入输出路径存放订单表数据FileInputFormat.setInputPaths(job, new Path(D:\\input\\inputtable2));

订单数据表 t_order

idpidamount设置reduceTask数量为0

Driver

// Map端Join的逻辑不需要Reduce阶段设置reduceTask数量为0job.setNumReduceTasks(0); 阶段keyvaluemap输入偏移量一行数据map输出一行数据NullWriable

Mapper

setup() 获取缓存文件读取一行缓存文件切割将缓存文件加载到集合 <01,小米><02,华为><03,格力> map() 获取一行数据截取获取pid获取订单id和商品名称拼接写出

使用场景

小表和大表关联 6.4 Map Join案例实操

Driver

public class MapJoinDriver {public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { // 1 获取job信息 Configuration conf new Configuration(); Job job Job.getInstance(conf); // 2 设置加载jar包路径 job.setJarByClass(MapJoinDriver.class); // 3 关联mapper job.setMapperClass(MapJoinMapper.class); // 4 设置Map输出KV类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); // 5 设置最终输出KV类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 加载缓存数据 job.addCacheFile(new URI(file:///D:/input/inputtablecache/pd.txt)); // Map端Join的逻辑不需要Reduce阶段设置reduceTask数量为0 job.setNumReduceTasks(0); // 6 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path(D:\\input\\inputtable2)); FileOutputFormat.setOutputPath(job, new Path(D:\\hadoop\\output59)); // 7 提交 boolean b job.waitForCompletion(true); System.exit(b ? 0 : 1); }}

Mapper

public class MapJoinMapper extends Mapper<LongWritable , Text , Text , NullWritable>{private Map<String,String> pdMap new HashMap<>();private Text outK new Text();Overrideprotected void setup(Context context) throws IOException, InterruptedException { // 获取缓存文件并把文件内容封装到集合 pd.txt // 通过缓存文件得到小表数据 pd.txt // context 会得到多个缓存文件地址 cacheFiles[0] 指得到第一个缓存地址 URI[] cacheFiles context.getCacheFiles(); Path path new Path(cacheFiles[0]); // 获取文件系统对象 并开流 FileSystem fs FileSystem.get(context.getConfiguration()); FSDataInputStream fis fs.open(path); // 通过包装流转换为reader方便按行读取 BufferedReader reader new BufferedReader(new InputStreamReader(fis, UTF-8)); //逐行读取按行处理 String line; while(StringUtils.isNotEmpty(line reader.readLine())){ String[] fields line.split(\t); pdMap.put(fields[0],fields[1]); } //关流 IOUtils.closeStream(reader);}Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 处理 order.txt String line value.toString(); String[] fields line.split(\t); // 获取pname String pname pdMap.get(fields[1]); // 封装 outK.set(fields[0] \t pname \t fields[2]); context.write(outK,NullWritable.get()); }}

结果

订单idtoString产品名称toString数量toString1004小米41001小米11005华为51002华为21006格力61003格力37、 数据清洗ETL 7.1 概念 抽取Extract转换Transform加载Load做ETL往往只需要Mapper端不需要Reduce程序 7.2 需求分析

去除日志中字段个数小于等于11的数据

保留按照空格切分字段数11

1.206.126.5 - - [19/Sep/2013:05:47:24 0000] GET /wp-content/uploads/2013/08/jasmine1.png HTTP/1.1 200 136476 http://angularjs.cn/A0cL Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.65 Safari/537.36

过滤掉按照空格切分字段数10

58.177.135.108 - - [19/Sep/2013:06:20:52 0000] - 400 0 - - 阶段keyvaluemap输入偏移量一行数据map输出一行数据NullWriable7.3 代码编写

Mapper

public class WebLogMapper extends Mapper<LongWritable , Text, Text , NullWritable> {Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取一行输入日志 String line value.toString(); // ETL boolean result parseLog(line,context); if(!result){ // 如果result是false则舍弃掉当前数据 return; } // 如果result是ture则保留当前数据 context.write(value,NullWritable.get());}private boolean parseLog(String line, Context context) { // 切割数据 String[] fields line.split( ); // 判断日志长度是否大于11 if(fields.length > 11){ return true; }else { return false; } }}

Driver

public class WebLogDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1 获取job信息 Configuration conf new Configuration(); Job job Job.getInstance(conf); // 2 加载jar包 job.setJarByClass(WebLogDriver.class); // 3 关联map job.setMapperClass(WebLogMapper.class); // 4 设置mapper输出的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); // 5 设置最终输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 设置reducetask个数为0 job.setNumReduceTasks(0); // 6设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(D:\\input\\inputweblog)); FileOutputFormat.setOutputPath(job, new Path(D:\\hadoop\\output60)); // 7 提交 boolean b job.waitForCompletion(true); System.exit(b ? 0 : 1); }} 7.4 结果 条数变少输入14619条输出13770条 8、 MapReduce核心框架原理总结

MapReduce将数据的计算简单分成Map和Reduce两个阶段。Map阶段将原本很大的数据集拆分成多个小份在不同服务器上各个击破。Reduce阶段则将原本小份的数据结果汇总进一步计算得到最终结果。

步骤

InputFormat 1默认的是TextInputformat 输入kv key偏移量value一行内容 2处理小文件CombineTextInputFormat 把多个文件合并到一起统一切片Mapper 用户的业务逻辑 setup()初始化 map()用户的业务逻辑 clearup() 关闭资源分区 默认分区HashPartitioner 默认按照key的hash值%numreducetask个数 自定义分区排序 1部分排序 每个输出的文件内部有序 2全排序 一个reduce对所有数据整体排序 慎用很容易OOM 3二次排序 自定义排序范畴 实现 WritableComparable接口 重写compareTo() 总流量倒序 按照上行流量 正序Combiner 前提不影响业务逻辑 不能用于求平均值 提前聚合map > 解决数据倾斜的一个方法Reducer 用户的业务逻辑 setup()初始化 reduce()用户的业务逻辑 clearup() 关闭资源OutputFormat 1默认TextOutputFormat 按行输出到文件 2自定义