zl程序教程

您现在的位置是:首页 >  工具

当前栏目

Yarn源码分析之如何确定作业运行方式Uber or Non-Uber?

源码 如何 分析 方式 运行 or 作业 确定
2023-09-27 14:29:33 时间

        在MRAppMaster中,当MapReduce作业初始化时,它会通过作业状态机JobImpl中InitTransition的transition()方法,进行MapReduce作业初始化相关操作,而这其中就包括:

        1、调用createSplits()方法,创建分片,并获取任务分片元数据信息TaskSplitMetaInfo数组taskSplitMetaInfo;

        2、确定Map Task数目numMapTasks:分片元数据信息数组的长度,即有多少分片就有多少numMapTasks;

        3、确定Reduce Task数目numReduceTasks,取作业参数mapreduce.job.reduces,参数未配置默认为0;

        4、根据分片元数据信息计算输入长度inputLength,也就是作业大小;

        5、根据作业大小inputLength,调用作业的makeUberDecision()方法,决定作业运行模式是Uber模式还是Non-Uber模式。

        相关关键代码如下:

 // 调用createSplits()方法,创建分片,并获取任务分片元数据信息TaskSplitMetaInfo数组taskSplitMetaInfo

 TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);

 // 确定Map Task数目numMapTasks:分片元数据信息数组的长度,即有多少分片就有多少numMapTasks

 job.numMapTasks = taskSplitMetaInfo.length;

 // 确定Reduce Task数目numReduceTasks,取作业参数mapreduce.job.reduces,参数未配置默认为0

 job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);

 // 省略部分代码

 // 根据分片元数据信息计算输入长度inputLength,也就是作业大小

 long inputLength = 0;

 for (int i = 0; i job.numMapTasks; ++i) {

 inputLength += taskSplitMetaInfo[i].getInputDataLength();

 // 根据作业大小inputLength,调用作业的makeUberDecision()方法,决定作业运行模式是Uber模式还是Non-Uber模式

 job.makeUberDecision(inputLength);
        由此,我们可以看出,作业运行方式Uber or Non-Uber是通过Job的makeUberDecision()方法,传入作业大小inputLength来确定的,本文,我们将研究这一话题,即如何确定作业运行方式Uber or Non-Uber?

        在《Yarn源码分析之MRAppMaster:作业运行方式Local、Uber、Non-Uber》一文中我们了解了Uber和Non-Uber两种作业运行方式的含义,如下:

        1、Uber模式:为降低小作业延迟而设计的一种模式,所有任务,不管是Map Task,还是Reduce Task,均在同一个Container中顺序执行,这个Container其实也是MRAppMaster所在Container;

        2、Non-Uber模式:对于运行时间较长的大作业,先为Map Task申请资源,当Map Task运行完成数目达到一定比例后再为Reduce Task申请资源。

        在确定了解上述内容后,我们再来看下Job的makeUberDecision()方法,这个Job的实现为JobImpl类,其makeUberDecision()方法代码如下:

 /**

 * Decide whether job can be run in uber mode based on various criteria.

 * @param dataInputLength Total length for all splits

 private void makeUberDecision(long dataInputLength) {

 //FIXME: need new memory criterion for uber-decision (oops, too late here;

 // until AM-resizing supported,

 // must depend on job client to pass fat-slot needs)

 // these are no longer "system" settings, necessarily; user may override

 // 获取系统Uber模式下允许的最大Map任务数sysMaxMaps,

 // 取参数mapreduce.job.ubertask.maxmaps,参数未配置默认为9

 int sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);

 // 获取系统Uber模式下允许的最大Reduce任务数sysMaxReduces,

 // 取参数mapreduce.job.ubertask.maxreduces,参数未配置默认为1

 int sysMaxReduces = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);

 // 获取系统Uber模式下允许的任务包含数据量最大字节数sysMaxBytes,

 // mapreduce.job.ubertask.maxbytes,参数未配置默认为远程作业提交路径remoteJobSubmitDir所在文件系统的默认数据块大小

 long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,

 fs.getDefaultBlockSize(this.remoteJobSubmitDir)); // FIXME: this is wrong; get FS from

 // [File?]InputFormat and default block size

 // from that

 // 获取系统为Uber模式设置的内存资源单元槽Slot大小sysMemSizeForUberSlot,

 // 取参数yarn.app.mapreduce.am.resource.mb,参数未配置默认为1536M

 long sysMemSizeForUberSlot =

 conf.getInt(MRJobConfig.MR_AM_VMEM_MB,

 MRJobConfig.DEFAULT_MR_AM_VMEM_MB);

 // 获取系统为Uber模式设置的CPU资源单元槽Slot大小sysCPUSizeForUberSlot,

 // 取参数yarn.app.mapreduce.am.resource.cpu-vcores,参数未配置默认为1

 long sysCPUSizeForUberSlot =

 conf.getInt(MRJobConfig.MR_AM_CPU_VCORES,

 MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);

 // 获取系统是否允许Uber模式标志位uberEnabled,

 // 取参数mapreduce.job.ubertask.enable,参数未配置默认为false,不启用

 boolean uberEnabled =

 conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);

 // 判断Map任务数是否满足系统为Uber模式设定的限制条件,结果赋值给smallNumMapTasks

 boolean smallNumMapTasks = (numMapTasks = sysMaxMaps);

 // 判断Reduce任务数是否满足系统为Uber模式设定的限制条件,结果赋值给smallNumReduceTasks

 boolean smallNumReduceTasks = (numReduceTasks = sysMaxReduces);

 // 判断任务包含数据量大小是否满足系统为Uber模式设定的限制条件,结果赋值给smallInput

 boolean smallInput = (dataInputLength = sysMaxBytes);

 // ignoring overhead due to UberAM and statics as negligible here:

 // 获取系统配置的Map任务要求的内存大小requiredMapMB,

 // 取参数mapreduce.map.memory.mb,参数未配置默认为0

 long requiredMapMB = conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0);

 // 获取系统配置的Map任务要求的内存大小requiredReduceMB,

 // 取参数mapreduce.reduce.memory.mb,参数未配置默认为0

 long requiredReduceMB = conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0);

 // 计算要求的任务内存大小requiredMB,

 // 取Map任务要求的内存大小requiredMapMB与Reduce任务要求的内存大小requiredReduceMB中的较大者

 long requiredMB = Math.max(requiredMapMB, requiredReduceMB);

 // 获取系统uber模式下Map任务要求的CPU核数requiredMapCores,

 // 取参数mapreduce.map.cpu.vcores,参数未配置默认为1

 int requiredMapCores = conf.getInt(

 MRJobConfig.MAP_CPU_VCORES, 

 MRJobConfig.DEFAULT_MAP_CPU_VCORES);

 // 获取系统uber模式下Reduce任务要求的CPU核数requiredReduceCores,

 // 取参数mapreduce.reduce.cpu.vcores,参数未配置默认为1

 int requiredReduceCores = conf.getInt(

 MRJobConfig.REDUCE_CPU_VCORES, 

 MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);

 // 计算要求的任务需要CPU核数requiredCores,

 // 取Map任务要求的CPU核数requiredMapCores与Reduce任务要求的CPU核数requiredReduceCores中的较大者

 int requiredCores = Math.max(requiredMapCores, requiredReduceCores); 

 // 特殊处理:如果Reduce任务数目为0,即当为Map-Only任务时,

 // 要求的内存大小、CPU核数,以Map任务要求的为准

 if (numReduceTasks == 0) {

 requiredMB = requiredMapMB;

 requiredCores = requiredMapCores;

 // 当MR作业中任务要求的内存大小requiredMB小于等于系统为Uber模式设置的内存资源单元槽Slot大小sysMemSizeForUberSlot时,

 // 或者sysMemSizeForUberSlot被设定为不受限制时,

 // 确定为小内存要求,即标志位smallMemory为true

 boolean smallMemory =

 (requiredMB = sysMemSizeForUberSlot)

 || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT);

 // 当MR作业中任务要求的CPU核数requiredCores小于等于系统为Uber模式设置的CPU资源单元槽Slot大小sysCPUSizeForUberSlot时,

 // 确定为小CPU要求,即标志位smallCpu为true

 boolean smallCpu = requiredCores = sysCPUSizeForUberSlot;

 // 确定作业是否为链式作业,并赋值给标志位notChainJob,ture表示非链式作业,false表示为链式作业

 boolean notChainJob = !isChainJob(conf);

 // User has overall veto power over uberization, or user can modify

 // limits (overriding system settings and potentially shooting

 // themselves in the head). Note that ChainMapper/Reducer are

 // fundamentally incompatible with MR-1220; they employ a blocking

 // queue between the maps/reduces and thus require parallel execution,

 // while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks

 // and thus requires sequential execution.

 // 判断是否为Uber模式,赋值给isUber,

 // 判断的依据为,以下七个条件必须全部满足:

 // 1、参数mapreduce.job.ubertask.enable配置为true,即系统允许Uber模式;

 // 2、Map任务数满足系统为Uber模式设定的限制条件,即小于等于参数mapreduce.job.ubertask.maxmaps配置的值,如果参数未配置,则应该小于等于9;

 // 3、Reduce任务数满足系统为Uber模式设定的限制条件,即小于等于参数mapreduce.job.ubertask.maxreduces配置的值,如果参数未配置,则应该小于等于1;

 // 4、任务包含数据量大小满足系统为Uber模式设定的限制条件,即任务数据量小于等于参数mapreduce.job.ubertask.maxbytes配置的值,如果参数未配置,则应小于等于远程作业提交路径remoteJobSubmitDir所在文件系统的默认数据块大小;

 // 5、MR作业中任务要求的内存大小requiredMB小于等于系统为Uber模式设置的内存资源单元槽Slot大小sysMemSizeForUberSlot时,或者sysMemSizeForUberSlot被设定为不受限制;

 // 6、MR作业中任务要求的CPU核数requiredCores小于等于系统为Uber模式设置的CPU资源单元槽Slot大小sysCPUSizeForUberSlot;

 // 7、作业为非链式作业;

 isUber = uberEnabled smallNumMapTasks smallNumReduceTasks

 smallInput smallMemory smallCpu 

 notChainJob;

 if (isUber) {// 当作业为Uber模式运行时,设置一些必要的参数

 LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"

 + numReduceTasks + "r tasks (" + dataInputLength

 + " input bytes) will run sequentially on single node.");

 // make sure reduces are scheduled only after all map are completed

 // mapreduce.job.reduce.slowstart.completedmaps参数设置为1,

 // 即全部Map任务完成后才会为Reduce任务分配资源

 conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,

 1.0f);

 // uber-subtask attempts all get launched on same node; if one fails,

 // probably should retry elsewhere, i.e., move entire uber-AM: ergo,

 // limit attempts to 1 (or at most 2? probably not...)

 // Map、Reduce任务的最大尝试次数均为1

 conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);

 conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);

 // disable speculation

 // 禁用Map、Reduce任务的推测执行机制

 conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);

 conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);

 } else {// 当作业为Non-Uber模式时,通过info级别日志,输出作业不能被判定为Uber模式的原因,根据上述7个标志位判断即可

 StringBuilder msg = new StringBuilder();

 msg.append("Not uberizing ").append(jobId).append(" because:");

 if (!uberEnabled)

 // Uber模式开关未打开,这种模式被禁用了

 msg.append(" not enabled;");

 if (!smallNumMapTasks)

 // 有太多的Map任务

 msg.append(" too many maps;");

 if (!smallNumReduceTasks)

 // 有太多的Reduce任务

 msg.append(" too many reduces;");

 if (!smallInput)

 // 有太大的输入

 msg.append(" too much input;");

 if (!smallCpu)

 // 需要占用过多的CPU

 msg.append(" too much CPU;");

 if (!smallMemory)

 // 需要占用过多的内存

 msg.append(" too much RAM;");

 if (!notChainJob)

 // 是一个链式作业,无法使用Uber模式

 msg.append(" chainjob;");

 // 记录无法使用Uber模式的日志信息

 LOG.info(msg.toString());

 }
        makeUberDecision()方法的逻辑十分清晰,但是涉及到的判断条件、参数比较多,总的来说,一个MapReduce是使用Uber模式还是Non-Uber模式运行,要综合考虑以下7个因素,这些条件缺一不可:

        1、 参数mapreduce.job.ubertask.enable配置为true,即系统允许Uber模式,这是一个Uber模式的开关;

        2、Map任务数满足系统为Uber模式设定的限制条件,即小于等于参数mapreduce.job.ubertask.maxmaps配置的值,如果参数未配置,则应该小于等于9;

        3、Reduce任务数满足系统为Uber模式设定的限制条件,即小于等于参数mapreduce.job.ubertask.maxreduces配置的值,如果参数未配置,则应该小于等于1;

        4、任务包含数据量大小满足系统为Uber模式设定的限制条件,即任务数据量小于等于参数mapreduce.job.ubertask.maxbytes配置的值,如果参数未配置,则应小于等于远程作业提交路径remoteJobSubmitDir所在文件系统的默认数据块大小;

        5、MR作业中任务要求的内存大小requiredMB小于等于系统为Uber模式设置的内存资源单元槽Slot大小sysMemSizeForUberSlot时,或者sysMemSizeForUberSlot被设定为不受限制;

        6、MR作业中任务要求的CPU核数requiredCores小于等于系统为Uber模式设置的CPU资源单元槽Slot大小sysCPUSizeForUberSlot;

        7、作业为非链式作业。

        前面6个条件在上面的描述和makeUberDecision()方法代码及其注释中都描述的很清晰,读者可自行查阅。

        下面,我们重点看看第7个条件:作业为非链式作业,这个条件是如何判断的呢?它是通过isChainJob()方法来判断的,代码如下:

 /**

 * ChainMapper and ChainReducer must execute in parallel, so theyre not

 * compatible with uberization/LocalContainerLauncher (100% sequential).

 private boolean isChainJob(Configuration conf) {

 boolean isChainJob = false;

 try {

 // 获取取Map类名mapClassName,取参数mapreduce.job.map.class

 String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR);

 if (mapClassName != null) {

 // 通过Map类名mapClassName获取Map类Class实例mapClass

 Class ? mapClass = Class.forName(mapClassName);

 // 通过Class的isAssignableFrom()方法,看看mapClass是否为ChainMapper的子类,或者就是ChainMapper,

 // 是的话,我们认为它就是一个链式作业

 if (ChainMapper.class.isAssignableFrom(mapClass))

 isChainJob = true;

 } catch (ClassNotFoundException cnfe) {

 // dont care; assume its not derived from ChainMapper

 } catch (NoClassDefFoundError ignored) {

 try {

 // 获取取Reduce类名reduceClassName,取参数mapreduce.job.reduce.class

 String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);

 if (reduceClassName != null) {

 // 通过Reduce类名reduceClassName获取Reduce类Class实例reduceClass

 Class ? reduceClass = Class.forName(reduceClassName);

 // 通过Class的isAssignableFrom()方法,看看reduceClass是否为ChainReducer的子类,或者就是ChainReducer,

 // 是的话,我们认为它就是一个链式作业

 if (ChainReducer.class.isAssignableFrom(reduceClass))

 isChainJob = true;

 } catch (ClassNotFoundException cnfe) {

 // dont care; assume its not derived from ChainReducer

 } catch (NoClassDefFoundError ignored) {

 return isChainJob;

 }
        它实际上就是看Map或Reduce是否是ChainMapper或ChainReducer的直接或间接子类,或者就是二者,通过参数mapreduce.job.map.class、mapreduce.job.reduce.class取类名并利用Class.forName构造Class实例,然后通过Class的isAssignableFrom()方法判断Map或Reduce是否是ChainMapper或ChainReducer的直接或间接子类,或者就是二者,就是这么简单。

        那么问题又来了,什么是链式作业?为什么继承了ChainMapper或ChainReducer就不能在Uber模式下运行?下面我们一一解答。

        首先,链式作业是什么呢?有些时候,你会发现,一个单独的MapReduce Job无法实现你的业务需求,你需要更多的MapReduce Job来处理你的数据,而此时,将多个MapReduce Job串成一条链就形成一个更大的MapReduce Job,这就是链式作业。而链式作业实现的一个根本条件就是其Mapper或Reducer分别继承自ChainMapper和ChainReducer。

        那么,为什么继承了ChainMapper或ChainReducer就不能在Uber模式下运行?连同什么是ChainMapper、ChainReducer这个问题,我们一起来做一个最直接最简单的解答,更多详细内容请查看关于专门介绍ChainMapper或ChainReducer的文章。

        首先看下ChainMapper的实现,在其内部,有一个Chain类型的成员变量chain,定义并在setup()方法中初始化如下:

 private Chain chain;

 protected void setup(Context context) {

 chain = new Chain(true);

 chain.setup(context.getConfiguration());

 }
        而Chain中有两个最关键的变量,Mapper列表mappers和Thread列表threads如下:

private List Mapper mappers = new ArrayList Mapper 
private List Thread threads = new ArrayList Thread 
        在ChainMapper的run()方法内,会将Chain的mappers中每个Mapper通过chain的addMapper()方法添加至chain中,而chain的addMapper()方法本质上就是基于每个Mapper生成一个MapRunner线程,然后添加到threads列表内,然后再由Mapper启动chain中所有线程threads,关键代码如下:

        ChainMapper的run()方法

 public void run(Context context) throws IOException, InterruptedException {

 setup(context);

 int numMappers = chain.getAllMappers().size();

 if (numMappers == 0) {

 return;

 ChainBlockingQueue Chain.KeyValuePair ?, ? inputqueue;

 ChainBlockingQueue Chain.KeyValuePair ?, ? outputqueue;

 if (numMappers == 1) {

 chain.runMapper(context, 0);

 } else {

 // add all the mappers with proper context

 // add first mapper

 outputqueue = chain.createBlockingQueue();

 chain.addMapper(context, outputqueue, 0);

 // add other mappers

 for (int i = 1; i numMappers - 1; i++) {

 inputqueue = outputqueue;

 outputqueue = chain.createBlockingQueue();

 chain.addMapper(inputqueue, outputqueue, context, i);

 // add last mapper

 chain.addMapper(outputqueue, context, numMappers - 1);

 // start all threads

 chain.startAllThreads();

 // wait for all threads

 chain.joinAllThreads();

 }
        Chain的其中一个addMapper()方法
 /**

 * Add mapper that reads and writes from/to the queue

 @SuppressWarnings("unchecked")

 void addMapper(ChainBlockingQueue KeyValuePair ?, ? input,

 ChainBlockingQueue KeyValuePair ?, ? output,

 TaskInputOutputContext context, int index) throws IOException,

 InterruptedException {

 Configuration conf = getConf(index);

 Class ? keyClass = conf.getClass(MAPPER_INPUT_KEY_CLASS, Object.class);

 Class ? valueClass = conf.getClass(MAPPER_INPUT_VALUE_CLASS, Object.class);

 Class ? keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);

 Class ? valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS,

 Object.class);

 RecordReader rr = new ChainRecordReader(keyClass, valueClass, input, conf);

 RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output,

 conf);

 MapRunner runner = new MapRunner(mappers.get(index), createMapContext(rr,

 rw, context, getConf(index)), rr, rw);

 threads.add(runner);

 }
        可以看出,ChainMapper实际上实现了一种多重Mapper,即multiple Mapper,它不再依托一个单独的Map Task,执行一种Map任务,而是依托多个Map Task,执行多种Map任务,所以,它肯定不适合Uber模式,因为Uber模式只限于Map、Reduce等各个任务的单线程串行执行。
        ChainReducer也是如此,不再做特别的说明。







基于Knox登录Yarn UI查看SparkStreaming作业兼容性问题说明 问题背景 1.登录EMR集群节点,运行SparkStreaming示例,如下所示(不同版本EMR集群spark-examples_xxx.jar的路径略有差异): [root@emr-header-1 ~]# spark-submit --class org.
都是default惹的祸-yarn调度(一)-fair调度器drf调度策略作业不执行问题的调查和源码分析 问题背景 yarn的fair类型资源池,是企业级hadoop用户常用的资源池类型。该资源池默认的队列调度策略是fair,即分配资源时只考虑内存限制。 对一个多个团队混合使用的大集群来说,如果想要在分配资源时同时考虑内存和cpu限制,需要指定调度策略为drf。
YARN ResourceManager重启作业保留机制 YARN可以通过相关配置支持ResourceManager重启过程中,不影响正在运行的作业,即重启后,作业还能正常继续运行直到结束
Yarn源码分析之MapReduce作业中任务Task调度整体流程(一)         v2版本的MapReduce作业中,作业JOB_SETUP_COMPLETED事件的发生,即作业SETUP阶段完成事件,会触发作业由SETUP状态转换到RUNNING状态,而作业状态转换中涉及作业信息的处理,是由SetupCompletedTransition来完成的,它主要做了...
Yarn源码分析之MRAppMaster:作业运行方式Local、Uber、Non-Uber         基于作业大小因素,MRAppMaster提供了三种作业运行方式:本地Local模式、Uber模式、Non-Uber模式。其中,         1、本地Local模式:通常用于调试;         2、Uber模式:为降低小作业延迟而设计的一种模式,所有任务,不管是Map Ta...