zl程序教程

您现在的位置是:首页 >  其他

当前栏目

hive map-reduce个数及合并小文件

文件Map 合并 hive 个数 reduce
2023-09-27 14:27:40 时间

1. map数计算方式

long splitSize = computeSplitSize(blockSize, minSize, maxSize);
protected long computeSplitSize(long blockSize, long minSize,  
                                long maxSize) {  
  return Math.max(minSize, Math.min(maxSize, blockSize));  
}  

/******************************************************/
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));

protected long getFormatMinSplitSize() {  
    return 1;  
  }  

public static long getMinSplitSize(JobContext job) {  
    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);  
  }  

public static final String SPLIT_MINSIZE =   
    "mapreduce.input.fileinputformat.split.minsize"; //默认值为0,所以默认minSize返回1

/******************************************************/
long maxSize = getMaxSplitSize(job);

public static long getMaxSplitSize(JobContext context) {  
    return context.getConfiguration().getLong(SPLIT_MAXSIZE,   
                                              Long.MAX_VALUE);  
  }  

 public static final String SPLIT_MAXSIZE =   
    "mapreduce.input.fileinputformat.split.maxsize";//默认maxSize返回Long.MAX_VALUE

/******************************************************/   
blockSize为配置文件中的参数dfs.block.size,默认值为128M

 

2. 影响map个数的因素

input的文件总个数,
input的文件大小,
集群设置的文件块大小(目前为128M, 可在Hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改);

举例: 
a)假设input目录下有1个文件a,大小为780M,那么Hadoop会将该文件a分隔成7个块(6个128m的块和1个12m的块),从而产生7个map数
b)假设input目录下有3个文件a,b,c,大小分别为10m,20m,130m,那么hadoop会分隔成4个块(10m,20m,128m,2m),从而产生4个map数

即,如果文件大于块大小(128m),那么会拆分,如果小于块大小,则把该文件当成一个块

 

3.修改map个数

3.1 合并小文件减少map数

a)输入合并。即在Map前合并小文件
set mapred.min.split.size=100000000;
set mapred.max.split.size=100000000;
-- 一个节点上split的至少的大小 ,决定了多个data node上的文件是否需要合并,不知道怎么用
set mapred.min.split.size.per.node=50000000;
-- 一个交换机下split的至少的大小,决定了多个交换机上的文件是否需要合并,不知道怎么用
set mapred.min.split.size.per.rack=50000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

100000000表示100M
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;这个参数表示执行前进行小文件合并

过期的对应配置可以查询:https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/DeprecatedProperties.html

mapred.min.split.size相当于mapreduce.input.fileinputformat.split.minsize,设置哪一个都一样
mapred.max.split.size相当于mapreduce.input.fileinputformat.split.maxsize,设置哪一个都一样
所以根据上面求splitSize的公式,可以知道,需增加mapper数,则减少这些值;需减少mapper数,则增加这些值,一般就配置这两个值就行了。

b)Hive结果合并
hive.merge.mapfiles 在map-only job后合并文件,默认true
hive.merge.mapredfiles 在map-reduce job后合并文件,默认false
hive.merge.size.per.task 合并后每个文件的大小,默认256000000
hive.merge.smallfiles.avgsize 平均文件大小,是决定是否执行合并操作的阈值,默认16000000

Hive在对结果文件进行合并时会执行一个额外的map-only脚本,mapper的数量是文件总大小除以size.per.task参数所得的值,触发合并的条件是:

 

3.2 增加map个数

当一个文件大小不大(假设120m,块大小为128m),但是好多数据(好多行),那么起了一个map处理将特别慢,此时需要增加map数来加快处理
此时可设置:
set mapred.map.tasks=10;
map的最终个数取决于
goalsize=min(totalsize/mapred.reduce.tasks,块大小)
goalsize就相当于splitsize

假设:
input有3个300M的文件,块大小为256M

场景1:
不设置map数,那么会有6个map,分别处理256M 44M 256M 44M 256M 44M
场景2:
map数为2个,goalsize=((900M)/2, 块大小)=256M,map数和处理的数据和场景1一样
场景3:
map数为6个,goalsize=((900M)/6, 块大小)=150,分别处理150 150 150 150 150 150

场景3分布比较均匀合理,不用等待大文件处理完成

 

4.默认reduce计算方式

hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1000^3=1G) 
hive.exec.reducers.max(每个任务最大的reduce数,默认为999)
计算reducer数的公式:N=min(参数2,总输入数据量/参数1)
即,如果reduce的输入(map的输出,一个map对应一个reduce?)总大小不超过1G,那么只会有一个reduce任务;

 

5.调整reduce个数

#在设置reduce个数的时候也需要考虑这两个原则:使大数据量利用合适的reduce数;使单个reduce任务处理合适的数据量;

#调整hive.exec.reducers.bytes.per.reducer参数的值;
set hive.exec.reducers.bytes.per.reducer=500000000; (500M)

#调整hive.exec.reducers.max
set hive.exec.reducers.max=15

#设置reduce大小
set mapred.reduce.tasks = 15;

 

6.只有一个reduce的情况

  • 没有group by的汇总,比如把select pt,count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt; 写成 select count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04';  这点非常常见,希望大家尽量改写。
  • 用了Order by
  • 有笛卡尔积

通常这些情况下,除了找办法来变通和避免,我暂时没有什么好的办法,因为这些操作都是全局的,所以hadoop不得不用一个reduce去完成;

7.设置map reduce输出参数合并小文件

--设置map端输出进行合并,默认为true  
set hive.merge.mapfiles = true  
--设置reduce端输出进行合并,默认为false  
set hive.merge.mapredfiles = true  
--设置合并文件的大小  
set hive.merge.size.per.task = 256*1000*1000  
--当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge。  
set hive.merge.smallfiles.avgsize=16000000 

 

8.参考链接

  1. http://blog.csdn.net/longzilong216/article/details/20711433
  2. http://www.makaidong.com/%E5%8D%9A%E5%AE%A2%E5%9B%AD%E6%8E%92%E8%A1%8C/16451.shtml
  3. http://younglibin.iteye.com/blog/1929255
  4. http://blog.csdn.net/yycdaizi/article/details/43341239
  5. https://github.com/linkedin/dr-elephant/wiki/Tuning-Tips
  6. http://superlxw1234.iteye.com/blog/1582880
  7. http://www.cnblogs.com/thinkpad/p/5173641.html
  8. https://blog.csdn.net/kanbuqinghuanyizhang/article/details/79132564