zl程序教程

您现在的位置是:首页 >  其他

当前栏目

MapReduce之join案例详解!

案例 详解 Join MapReduce
2023-09-27 14:25:57 时间
一、需求分析

MapReduce提供了表连接操作其中包括Map端join、Reduce端join还有半连接 现在我们要讨论的是Map端join Map端join是指数据到达map处理函数之前进行合并的 效率要远远高于Reduce端join 因为Reduce端join是把所有的数据都经过Shuffle 非常消耗资源。


案例分析

一个电商网站后台数据存在两个表 可以看为两个文件

用户表信息 用户ID、用户名、电话

订单表信息 订单ID、用户ID、商品价格、商品名

如果想把两张表关联成 用户ID、用户名、电话、订单ID 价格 商品名 并且按照需求对其输出。


在关系型数据库中 我们可以通过简单的sql语句实现

 **customer

 cid cname telphone

 **order

 oid cid price pname

 cid cname teplone pname price

 (select * from customer a,order b where a.cid b.cid)

那么在mapreduce我们如何实现 根据不同场景 可以选用不同的方案 下面我们详细了解不同的方法。

我们先准备好数据集

customer - 数据集

 1,jone,13423459976

 2,ben,15099871134

 3,henry,13599187709

 4,tony,13399008876

order - 数据集

 100,1,45.50,product-1

 200,1,23,product-2

 300,1,50,product-3

 400,1,99,product-4

 102,2,19.9,product-5

 103,2,33,product-6

 104,3,44,product-7

 105,4,1009,product-8

 106,5,22,product-9

MR中 多数据集关联 关键是找到KEY 两个方案来实现join:


二、Map join实现

Map join实现的思路 Map side join是针对以下场景进行的优化 两个待连接表中 有一个表非常大 而另一个表非常小 以至于小表可以直接存放到内存中。这样 我们可以将小表复制多 份 让每个map task内存中存在一份 比如存放到hash table中 然后只扫描大表 对于大表中的每一条记录key/value 在hash table中查找是否有相同的key的记录 如果有 则连接后输出即可。


应用场景 两个待连接表中 有一个表非常大 而另一个表非常小 比如就电商中的用户订单表和用户表 一个用户可以有多个订单 这样一对多的关系 就会导致用户表数据量较小 订单表数据量较大 我们可以把用户表放入缓存中List,Map 将订单表放入map端 map()输入数据与内存中的数据进行匹配 如果匹配上 就输出他们的关联合并数据。


较少数据集放入内存中List,Map

Map key,value Map cid,customerInfo 

map()输入数据与内存中的数据进行匹配 如果匹配上 就输出他们的关联合并数据
read order files - cid,orderInfo - find in Map

实现代码

package com.kfk.hadoop.mr.join;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStream;

import java.io.InputStreamReader;

import java.net.URI;

import java.net.URISyntaxException;

import java.util.HashMap;

import java.util.Map;

 * author : 蔡政洁

 * email :caizhengjie888 icloud.com

 * date : 2020/10/9

 * time : 7:07 下午

public class DistributedCache extends Configured implements Tool {

 static Map String,String customerMap new HashMap String, String 

 * map

 * TODO

 public static class TemplateMapper extends Mapper LongWritable, Text,Text, Text {

 private static final Text outputKey new Text();

 private static final Text outputValue new Text();


Override public void setup(Context context) throws IOException, InterruptedException { // 创建configuration Configuration configuration context.getConfiguration(); // 获取要将数据缓存到内存中数据的路径 URI[] uri Job.getInstance(configuration).getCacheFiles(); Path path new Path(uri[0]); FileSystem fileSystem FileSystem.get(configuration); // 创建输入流 InputStream inputStream fileSystem.open(path); InputStreamReader inputStreamReader new InputStreamReader(inputStream); BufferedReader bufferedReader new BufferedReader(inputStreamReader); // 读取每一行数据 String line null; while ((line bufferedReader.readLine()) ! null){ // 如果每一行的长度大于0 将数据放入map中 if (line.trim().length() 0){ // 数据按照 , 分开 第一个作为map的key 整行作为value customerMap.put(line.split( , )[0],line); // 关闭流 bufferedReader.close(); inputStream.close(); inputStreamReader.close(); Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 将map端输入每一行的value转化为字符串 String linevalue value.toString(); // 将每一行数据按 , 分开 String[] lines linevalue.split( , 解析数据 将每一行的数据放入到一个数组中 取出数组中的第二个元素 customerMap.get()方法返回的value 如果value不为空 map端的outputKey就为数组中的第二个元素 map端的outputValue就为customerMap的value map输入端的value eg:1 (1,jone,13423459976) (400,1,99,product-4) if (customerMap.get(lines[1]) ! null){ // outputKey的数据模型是cid outputKey.set(lines[1]); // outputValue的数据模型是orderInfo customerInfo outputValue.set(customerMap.get(lines[1]) linevalue); System.out.println(outputKey --- outputValue); context.write(outputKey,outputValue); Override public void cleanup(Context context) throws IOException, InterruptedException { // TODO
public static class TemplateReducer extends Reducer Text,IntWritable,Text,IntWritable { Override public void setup(Context context) throws IOException, InterruptedException { // TODO Override public void reduce(Text key, Iterable IntWritable values, Context context) throws IOException, InterruptedException { // TODO Override public void cleanup(Context context) throws IOException, InterruptedException { // TODO * run * param args * return * throws IOException * throws ClassNotFoundException * throws InterruptedException public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { // 1) get conf Configuration configuration this.getConf(); // 2) create job Job job Job.getInstance(configuration,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); // 3.1) input 指定job的输入 Path path new Path(args[0]); FileInputFormat.addInputPath(job,path); // 3.2) map 指定job的mapper和输出的类型 job.setMapperClass(TemplateMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 1.分区 // job.setPartitionerClass(); // 2.排序 // job.setSortComparatorClass(); // 3.combiner -可选项 // job.setCombinerClass(WordCountCombiner.class); // 4.compress -可配置 // configuration.set( mapreduce.map.output.compress , true // 使用的SnappyCodec压缩算法 // configuration.set( mapreduce.map.output.compress.codec , org.apache.hadoop.io.compress.SnappyCodec // 5.分组 // job.setGroupingComparatorClass(); // 6.设置reduce的数量 // job.setNumReduceTasks(2); // 3.3) reduce 指定job的reducer和输出类型 // job.setReducerClass(TemplateReducer.class); // job.setOutputKeyClass(Text.class); // job.setOutputValueClass(IntWritable.class);
hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/order.txt , hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output , hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/customer.txt // WordCountUpMR wordCountUpMR new WordCountUpMR(); Configuration configuration new Configuration(); try { // 判断输出的文件存不存在 如果存在就将它删除 Path fileOutPath new Path(args[1]); FileSystem fileSystem FileSystem.get(configuration); if (fileSystem.exists(fileOutPath)){ fileSystem.delete(fileOutPath,true); // 调用run方法 int status ToolRunner.run(configuration,new DistributedCache(),args); // 退出程序 System.exit(status); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace();

运行结果

1 1,jone,13423459976400,1,99,product-4

1 1,jone,13423459976300,1,50,product-3

1 1,jone,13423459976200,1,23,product-2

1 1,jone,13423459976100,1,45.50,product-1

2 2,ben,15099871134103,2,33,product-6

2 2,ben,15099871134102,2,19.9,product-5

3 3,henry,13599187709104,3,44,product-7

4 4,tony,13399008876105,4,1009,product-8


三、Reduce join实现

Reduce join实现思路 相同key的value值合并在一起 所有的数据集都作为map的输入。 在map阶段 map函数同时读取两个文件customer.txt和order.txt 为了区分两种来源的key/value数据对 对每条数据打一个标签tag 比如 tag customer表示来自文件customer.txt tag order表示来自文件order.txt。即 map阶段的主要任务是对不同文件中的数据打标签。在reduce阶段 reduce函数获取key相同的来自customer.txt和order.txt文件的value list 然后对于同一个key 对customer.txt和order.txt中的数据进行join 笛卡尔乘积 。即 reduce阶段进行实际的连接操作。


map端

 **customer

 cid,customerInfo - cid,customerInfo(tag:customer data:cinfo) 


cid,orderInfo - cid,orderInfo(tag:order data:oInfo)

reduce端

 cid,List(customerInfo orderInfo) 

计算过程

25.png自定义数据类型代码

package com.kfk.hadoop.mr.join;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

 * author : 蔡政洁

 * email :caizhengjie888 icloud.com

 * date : 2020/10/17

 * time : 4:50 下午

public class DataJoinWritable implements Writable {

 private String tag;

 private String data;


import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.ArrayList; import java.util.List; * author : 蔡政洁 * email :caizhengjie888 icloud.com * date : 2020/10/9 * time : 7:07 下午 public class DataJoinMR extends Configured implements Tool { * map * TODO public static class TemplateMapper extends Mapper LongWritable, Text,Text, DataJoinWritable { // 创建map端输出的key value对象 private static final Text outputKey new Text(); private static final DataJoinWritable outputValue new DataJoinWritable(); Override public void setup(Context context) throws IOException, InterruptedException { // TODO Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 将map端输入的value转换成数组 String[] values value.toString().split( , // 数据预处理 if ((values.length ! 3) (values.length ! 4)){ return; // customer数据处理 if (values.length 3){ String cid values[0]; String name values[1]; String telephone values[2]; // 将cid作为key outputKey.set(cid); // DataCommon.CUSTOMER表示为tag 并设置为value outputValue.set(DataCommon.CUSTOMER,name , telephone); // order数据处理 if (values.length 4){ String cid values[1]; String price values[2]; String productName values[3]; // 将cid作为key outputKey.set(cid); // DataCommon.ORDER表示为tag 并设置为value outputValue.set(DataCommon.ORDER,price , productName); context.write(outputKey,outputValue); // 打印出outputKey,outputValue的数据格式 System.out.println(outputKey , outputValue); Override public void cleanup(Context context) throws IOException, InterruptedException { // TODO
public static class TemplateReducer extends Reducer Text,DataJoinWritable, NullWritable,Text { // 创建reduce输出value对象 输出的数据类型为Text private static final Text outputValue new Text(); Override public void setup(Context context) throws IOException, InterruptedException { // TODO Override public void reduce(Text key, Iterable DataJoinWritable values, Context context) throws IOException, InterruptedException { // 打印出reduce输入的kv集合 用于本机测试 // List DataJoinWritable list Lists.newArrayList(values); // System.out.println( Reduce in KeyIn: key ValueIn: list); // reduce端输入的数据格式 cid,List(customerInfo,orderInfo,orderInfo,orderInfo) String customerInfo null; // 定义一个orderList的列表 里面存放orderInfo List String orderList new ArrayList String // 从reduce输入的列表中取数据 for (DataJoinWritable dataJoinWritable:values){ // 取出customerInfo的数据 if (DataCommon.CUSTOMER.equals(dataJoinWritable.getTag())){ customerInfo dataJoinWritable.getData(); // 取出orderInfo的数据 else if (DataCommon.ORDER.equals(dataJoinWritable.getTag())){ orderList.add(dataJoinWritable.getData()); // 输出数据 for (String orderInfo : orderList){ // 如果reduce输入的列表中没有customerInfo的数据就筛选掉 if (customerInfo null) { continue; // 设置reduce输出的value outputValue.set(key.toString() , customerInfo , orderInfo); // 输出key类型设置为null context.write(NullWritable.get(),outputValue); Override public void cleanup(Context context) throws IOException, InterruptedException { // TODO * run * param args * return * throws IOException * throws ClassNotFoundException * throws InterruptedException public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1) get conf Configuration configuration this.getConf(); // 2) create job Job job Job.getInstance(configuration,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); // 3.1) input 指定job的输入 Path path new Path(args[0]); FileInputFormat.addInputPath(job,path); // 3.2) map 指定job的mapper和输出的类型 job.setMapperClass(TemplateMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DataJoinWritable.class); // 1.分区 // job.setPartitionerClass(); // 2.排序 // job.setSortComparatorClass(); // 3.combiner -可选项 // job.setCombinerClass(WordCountCombiner.class); // 4.compress -可配置 // configuration.set( mapreduce.map.output.compress , true // 使用的SnappyCodec压缩算法 // configuration.set( mapreduce.map.output.compress.codec , org.apache.hadoop.io.compress.SnappyCodec // 5.分组 // job.setGroupingComparatorClass(); // 6.设置reduce的数量 // job.setNumReduceTasks(2); // 3.3) reduce 指定job的reducer和输出类型 job.setReducerClass(TemplateReducer.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); // 3.4) output 指定job的输出 Path outpath new Path(args[1]); FileOutputFormat.setOutputPath(job,outpath); // 4) commit 执行job boolean isSuccess job.waitForCompletion(true); // 如果正常执行返回0 否则返回1 return (isSuccess) ? 0 : 1; public static void main(String[] args) { // 添加输入 输入参数 args new String[]{ hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/join/ , hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output // WordCountUpMR wordCountUpMR new WordCountUpMR(); Configuration configuration new Configuration(); try { // 判断输出的文件存不存在 如果存在就将它删除 Path fileOutPath new Path(args[1]); FileSystem fileSystem FileSystem.get(configuration); if (fileSystem.exists(fileOutPath)){ fileSystem.delete(fileOutPath,true); // 调用run方法 int status ToolRunner.run(configuration,new DataJoinMR(),args); // 退出程序 System.exit(status); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace();

运行结果

1,jone,13423459976,99,product-4

1,jone,13423459976,50,product-3

1,jone,13423459976,23,product-2

1,jone,13423459976,45.50,product-1

2,ben,15099871134,33,product-6

2,ben,15099871134,19.9,product-5

3,henry,13599187709,44,product-7

4,tony,13399008876,1009,product-8

总结 blog介绍了两种join方式。这两种join方式适用于不同的场景 其处理效率上的相差还是蛮大的 资源消耗也不相同 其中主要导致因素是网络传输。