zl程序教程

您现在的位置是:首页 >  工具

当前栏目

[笔记]MapReduce入门详解大数据

笔记数据入门 详解 MapReduce
2023-06-13 09:20:27 时间

有一个日志文件,记录了一些搜索关键词搜索的记录,xx.log
按照 搜索时间 搜索关键词 ….. 等等属性为规则的一行行排列,使用分布式运行大概有以下几个步骤:
这里写图片描述

1:任务资源的分发(jar文件,运算程序) 1.1:服务器运算资源对各个任务的分配 2:任务在各个节点上设置运行环境,启动执行 3:监视各个节点上任务的执行状态 4:如果有任务失败,还要设法重试 5:中间结果的调度、汇总
就以上几点来说,如果我们自己实现的话,随便一个实现起来都非常的困难,所以hadoop提供了解决方案:
1、1.1、2 — yarn 资源调度集群框架(hadoop原生)
3、4、5 — mapreduce分布式计算模型框架(hadoop原生) 其他的有:spark、storm
wordcount(统计)

示例需求:在hdfis中某个目录下,有一些日志文件,需要统计该文件中单词出现的次数

我们使用 hadoop提供的示例jar包来运行分析:(首先得模拟一些单词日志文件上传到hdfs上)
参数说明:运行什么例子 需要分析的数据文件/文件夹 统计结果输出目录

hadoop jar /home/hadoop/app/hadoop-2.4.1/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1.jar wordcount /wordCount/data/ /wordCount/output 

运行完毕之后,使用fs的查看文本命令查看输出的目录:hadoop fs -cat /wordCount/output/part-r-00000
pi(计算圆周率)

示例需求:计算圆周率

hadoop jar /home/hadoop/app/hadoop-2.4.1/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1.jar pi 10 10
MapReduce Helloworld wordcount(统计)实现

MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。
这两个函数的形参是key、value对,表示函数的输入信息。

mapper

import java.io.IOException; 

import org.apache.hadoop.io.LongWritable; 

import org.apache.hadoop.io.Text; 

import org.apache.hadoop.mapreduce.Mapper; 

 * @ClassName: WordCountMapper 

 * @Description:
 

 * KEYIN, 输入的偏移量 

 * VALUEIN, 输入的(读取)的value(一行文本) 

 * KEYOUT, 输出的key 

 * VALUEOUT,输出的value 

 * 

* @author zq
*
* @version V0.1
* @date 2015年4月12日 下午2:52:54
*/
public class WordCountMapper extends Mapper LongWritable, Text, Text, LongWritable {
// 偏移量、读取到的文本、处理之后的每个单词、单词出现的次数(输出一次为1)
@Override
protected void map(LongWritable key, Text value, Mapper LongWritable, Text, Text, LongWritable .Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" "); // 切分数据
for (String word : words) {
context.write(new Text(word), new LongWritable(1));
}
}
}

reducer

import java.io.IOException; 

import org.apache.hadoop.io.LongWritable; 

import org.apache.hadoop.io.Text; 

import org.apache.hadoop.mapreduce.Reducer; 

 * @ClassName: WordCountReducer 

 * @Description: 
 

 * 参数依次是: 

 * mapper中的输出类型(前两个) 

 * reducer中的输出类型(后两个) 

 * 

* @author zq
*
* @version V0.1
* @date 2015年4月12日 下午7:50:46
*/
public class WordCountReducer extends Reducer Text, LongWritable, Text, LongWritable {
@Override
protected void reduce(Text key, Iterable LongWritable values, Reducer Text, LongWritable, Text, LongWritable .Context context) throws IOException,
InterruptedException {
//统计工作
long count = 0;
//遍历values,累加到计数器中
for (LongWritable v : values) {
count+=v.get();
}
//输出一个单词key及其总次数
context.write(new Text(key), new LongWritable(count));
}
}

driver运行入口配置信息

import org.apache.hadoop.conf.Configuration; 

import org.apache.hadoop.fs.Path; 

import org.apache.hadoop.io.LongWritable; 

import org.apache.hadoop.io.Text; 

import org.apache.hadoop.mapreduce.Job; 

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class WordCountDriver { 

 public static void main(String[] args) throws Exception { 

 Configuration conf = new Configuration(); 

 // 构造一个job对象来封装本mapreduce业务到所有信息 

 Job wcjob = Job.getInstance(conf); 

 // 指定本job工作用到的jar包位置 

 wcjob.setJarByClass(WordCountDriver.class); 

 // 指定本job用到的mapper类 

 wcjob.setMapperClass(WordCountMapper.class); 

 // 指定本job用到的reducer类 

 wcjob.setReducerClass(WordCountReducer.class); 

 // 指定mapper输出的kv类型 

 wcjob.setMapOutputKeyClass(Text.class); 

 wcjob.setMapOutputValueClass(LongWritable.class); 

 // 指定reducer输出到kv数据类型,(setOutputKeyClass 

 // 会对mapper和reducer都起作用,如果上面mapper不设置的话) 

 wcjob.setOutputKeyClass(org.apache.hadoop.io.Text.class); 

 wcjob.setOutputValueClass(LongWritable.class); 

 // 指定程序处理到输入数据所在的路径 

 FileInputFormat.setInputPaths(wcjob, new Path("/wordCount/data/")); 

 // 指定程序处理到输出结果所在路径 

 FileOutputFormat.setOutputPath(wcjob, new Path("/wordCount/output/")); 

 // 将该job通过yarn的客户端进行提交 

 wcjob.waitForCompletion(true); 

那么问题来了,怎么运行该程序呢? 方式1:利用hadoop命令提交(集群运行模式) 1:首先把上面写好的程序打成jar包,上传到linux服务器中
注意:如果上传到hdfs中,运行hdfs中的jar的时候会提示:Not a valid JAR: /wordcount.jar 2:使用hadoop -jar 运行

hadoop jar /wordCount/jar/WordCount.jar cn.mapperReduce.mr1.WordCountDriver


注:如果打jar包的时候打成的是runJar,则这里可不用指定入口类


方式2:直接在eclipse中运行driver main函数(local本地运行模式:方便调试)仅限linux下

直接运行上面的代码会抛出错误:

Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: file:/wordCount/data

**需要注意的点有2点:**

输入输出目录,要是hdfs目录 要指定jar包在本机上的位置(我们把jar包打到当前项目classPath下)
//1:指定为hdfs文件系统 

conf.set("fs.defaultFS", "hdfs://192.168.184.141:9000"); 

//2:指定jar包位置 

conf.set("mapreduce.job.jar", "WordCount.jar");

运行程序后可以发现:job编号为 local,此种方式只是在本机运行
2015-04-12 07:31:05,637 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1373)) Job job_local601569298_0001 completed successfully

替代方案:

//1:指定为hdfs文件系统
conf.set(“fs.defaultFS”, “hdfs://192.168.184.141:9000”);
该行代码可以用xml文件来替代:复制hadoop配置好的xml文件到clallPath下,也可以:
core-site.xml 和 hdfs-site.xml
原理:读取配置文件中的hdfs系统配置,替代代码


基于方式2:加入了yarn配置:
mapred-site.xml
yarn-site.xml
把以上两个配置文件扔到classpath路径下
原理:程序在运行的时候,读取到配置信息:mapreduce.framework.name 运行在yarn上,所以这样就能把程序运行在集群中了,方便跟踪源码了解过程和调试
可以发现info中显示job 没有了local,成功了运行

2015-04-12 07:44:34,324 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1373)) - Job job_1428810427154_0003 completed successfully

运算完成之后,可以通过 cat 命令,到输出目录中查看结果
hadoop fs -cat /wordCount/output/part-r-00000

[hadoop@had01 ~]$ hadoop fs -cat /wordCount/output/part-r-00000 

dst 1 

param 2 

path 2 

src 1 

zhuqiang 3
mapreduce运行逻辑

这里写图片描述
入门逻辑:
一个节点:
由InputFormat组件从hdfs上面读取需要分析的文件,每次读取一行输送给mapper,mapper处理完之后,交给shuffle把同一个key进行组合,排序之后,交给reducer,reducer进行汇总,并输出,再由OutputFormat输出结果到指定目录

yarn工作机制及job提交流程

这里写图片描述

小程序练习 统计日志流量

数据示例:

1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 

1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
统计每个手机号码的上下行和总流量

mapReduce 程序,由于比较简单,写到同一个类中的

/** 

 * 统计每个号码的上行下行流量汇总 

 * @author hadoop 

public class FlowCount { 

 public static class FlowMapper extends 

 Mapper LongWritable, Text, Text, FlowInfo { 

 @Override 

 protected void map(LongWritable key, Text value, Context context) 

 throws IOException, InterruptedException { 

 // 每行数据示例:1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 

 // 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200 

 String[] split = StringUtils.split(value.toString(), "/t"); 

 Long up_flow = new Long(split[split.length - 3]); 

 Long down_flow = new Long(split[split.length - 2]); 

 context.write(new Text(split[1]), new FlowInfo(up_flow, down_flow)); 

 public static class FlowReducer extends 

 Reducer Text, FlowInfo, Text, FlowInfo { 

 @Override 

 protected void reduce(Text key, Iterable FlowInfo values, 

 Context context) throws IOException, InterruptedException { 

 Long up_flow = 0l; 

 Long down_flow = 0l; 

 for (FlowInfo ff : values) { 

 up_flow += ff.getUp_flow(); 

 down_flow += ff.getDown_flow(); 

 context.write(new Text(key), new FlowInfo(up_flow, down_flow)); 

 public static void main(String[] args) throws IOException, 

 ClassNotFoundException, InterruptedException { 

 // 配置信息 

 Configuration conf = new Configuration(); 

 conf.set("fs.defaultFS", "hdfs://192.168.184.141:9000"); 

 conf.set("mapreduce.job.jar", "FlowCount.jar"); 

 Job job = Job.getInstance(conf); 

 // 指定jar包 

 job.setJarByClass(FlowCount.class); 

 job.setMapperClass(FlowMapper.class); 

 job.setReducerClass(FlowReducer.class); 

 // 指定输入输出类型 

 job.setOutputKeyClass(Text.class); 

 job.setOutputValueClass(FlowInfo.class); 

 // 指定数据所在目录 

 FileInputFormat.setInputPaths(job, new Path("/wordCount/data/http/")); 

 // 制定结果输出目录 

 FileOutputFormat.setOutputPath(job, new Path("/wordCount/output/")); 

 job.waitForCompletion(true); 

}

由于输出的value需要多个业务参数:上行流量、下行流量,总流量,就需要自定义vaue

import org.apache.hadoop.io.WritableComparable; 

 * 流量信息,自定义value,需要注意: 

 * 1:必须留一个空参数构造,否则会抛出:init方法未找到 

 * 2:需要实现序列化和反序列化 

 * @author hadoop 

public class FlowInfo implements WritableComparable FlowInfo { 

 private long up_flow; // 上行 

 private long down_flow; // 下行 

 private long sum_flow; // 总流量 

 public FlowInfo(long up_flow, long down_flow) { 

 super(); 

 this.up_flow = up_flow; 

 this.down_flow = down_flow; 

 this.sum_flow = up_flow + down_flow; 

 public FlowInfo() { 

 super(); 

 public long getUp_flow() { 

 return up_flow; 

 public void setUp_flow(long up_flow) { 

 this.up_flow = up_flow; 

 public long getDown_flow() { 

 return down_flow; 

 public void setDown_flow(long down_flow) { 

 this.down_flow = down_flow; 

 public long getSum_flow() { 

 return sum_flow; 

 public void setSum_flow(long sum_flow) { 

 this.sum_flow = sum_flow; 

 @Override 

 public String toString() { 

 //key,value 是默认/t ,所以这里分割需要注意下 

 return this.up_flow + "/t" + this.down_flow + "/t" + sum_flow; 

 // 序列化方法,把需要序列化的字段输出 

 @Override 

 public void write(DataOutput out) throws IOException { 

 out.writeLong(this.up_flow); 

 out.writeLong(this.down_flow); 

 out.writeLong(this.sum_flow); 

 // 反序列化 

 @Override 

 public void readFields(DataInput in) throws IOException { 

 this.up_flow = in.readLong(); 

 this.down_flow = in.readLong(); 

 this.sum_flow = in.readLong(); 

 @Override 

 public int compareTo(FlowInfo o) { 

 // TODO Auto-generated method stub 

 return 0; 

注意:自定义key,value 如果不实现WritableComparable将会抛出以下异常,
Error: java.lang.NullPointerException
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:988)
原理:因为文件或许会很大,有策略当容量达到一定大小的时候就会把数据溢出到本地文件,所以需要实现hadoop的序列化接口
统计部分结果:

13480253104 3600 3600 7200 

13502468823 146700 2206980 2353680 

13560436666 35970 256350 292320 

13560439658 40680 117840 158520 

13602846565 38760 58200 96960 

13660577991 139200 13800 153000 

13719199419 4800 0 4800 

13726230503 24810 246810 271620 

13726238888 49620 493620 543240
按总流量降序排序

直接在上面程序的统计结果上,再排序。
排序之前就得弄明白,排序是在什么阶段完成的。看下图
mapReduce的一个核心就在shuffle了,从图上看出,排序是按key来进行排序的,所以从这里入手
这里写图片描述

思路:让把flowInfo设置为key,并覆写compareTo方法,来返回大小
在reducer中输出的时候把key和value再换回来
compareTo:(flowInfo的代码和上面程序的一致,只是compartTo代码变成了下面这样)

 @Override 

 public int compareTo(FlowInfo o) { 

 return this.sum_flow o.getSum_flow()?1:-1; //小于返回1,大于或则等于返回-1 降序 

 }

FlowCountSort

/** 

 * 在上一个程序产生的结果中排序 

 * @author hadoop 

public class FlowCountSort { 

 * 上个文件产生的结果示例:需求:按总流量降序 

 * 13480253104 3600 3600 7200 

 13502468823 146700 2206980 2353680 

 13560436666 35970 256350 292320 

 13560439658 40680 117840 158520 

 public static class FlowMapper extends 

 Mapper LongWritable, Text, FlowInfo, Text { 

 @Override 

 protected void map(LongWritable key, Text value, Context context) 

 throws IOException, InterruptedException { 

 String[] split = StringUtils.split(value.toString(), "/t"); 

 Long up_flow = new Long(split[1]); 

 Long down_flow = new Long(split[2]); 

 context.write(new FlowInfo(up_flow, down_flow),new Text(split[0])); 

 public static class FlowReducer extends 

 Reducer FlowInfo,Text, Text, FlowInfo { 

 @Override 

 protected void reduce(FlowInfo key, Iterable Text values,Context context) 

 throws IOException, InterruptedException { 

 //由于读取的源文件已经是汇总过的了,所以values中只会存在一个元素。 

 Text phone = values.iterator().next(); 

 context.write(phone, key); 

 public static void main(String[] args) throws IOException, 

 ClassNotFoundException, InterruptedException { 

 // 配置信息 

 Configuration conf = new Configuration(); 

 conf.set("fs.defaultFS", "hdfs://192.168.184.141:9000"); 

 conf.set("mapreduce.job.jar", "FlowCountSort.jar"); 

 Job job = Job.getInstance(conf); 

 // 指定jar包 

 job.setJarByClass(FlowCountSort.class); 

 job.setMapperClass(FlowMapper.class); 

 job.setReducerClass(FlowReducer.class); 

 //指定map输出类型 

 job.setMapOutputKeyClass(FlowInfo.class); 

 job.setMapOutputValueClass(Text.class); 

 // 指定reducer输出类型 

 job.setOutputKeyClass(Text.class); 

 job.setOutputValueClass(FlowInfo.class); 

 // 指定数据所在目录 

 FileInputFormat.setInputPaths(job, new Path("/wordCount/output/")); 

 // 制定结果输出目录 

 FileOutputFormat.setOutputPath(job, new Path("/wordCount/output2/")); 

 job.waitForCompletion(true); 

需求:把某一部分号码归为一类统计

从运行的进程上来看,(在运行mapreduce程序的时候,我在每台机器上进行jps查看进程,发现)
影响YarnChild(mapper)数量,和文件的数量有关,而默认YarnChild(reducer)只有一个,一个reducer那么汇总和输出的文件就只有一份

Partitioner编程

HashPartitioner是mapreduce的默认partitioner。计算方法是
which reducer=(key.hashCode() Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。


public class MyPartitioner extends Partitioner Text, FlowInfo { private static HashMap String, Integer map = new HashMap String, Integer static { map.put("135", 0); map.put("136", 1); map.put("137", 2); map.put("138", 3); @Override public int getPartition(Text key, FlowInfo value, int numPartitions) { String str = key.toString().substring(0, 3); if (map.containsKey(str)) { return map.get(str); return 4; // 没找到的分组交给第4组reducer处理

针对第一个程序,的main入口中的job设定Partitioner。

job.setPartitionerClass(MyPartitioner.class); 

job.setNumReduceTasks(5); //设置有5组reduceTasks处理,需要和Partitioner里面预期分组的服务数相同

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9633.html

分布式文件系统,分布式数据库区块链并行处理(MPP)数据库,数据挖掘开源大数据平台数据中台数据分析数据开发数据治理数据湖数据采集