zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Hadoop学习总结之四:Map-Reduce的过程解析详解大数据

Maphadoop学习数据 详解 解析 总结 过程
2023-06-13 09:20:25 时间

Map-Reduce的过程首先是由客户端提交一个任务开始的。

提交任务主要是通过JobClient.runJob(JobConf)静态函数实现的:


public static RunningJob runJob(JobConf job) throws IOException {

  //首先生成一个JobClient对象

  JobClient jc = new JobClient(job);

  ……

  //调用submitJob来提交一个任务

  running = jc.submitJob(job);

  JobID jobId = running.getID();

  ……

  while (true) {

  //while循环中不断得到此任务的状态,并打印到客户端console中

  }

  return running;

}


 

public RunningJob submitJob(JobConf job) throws FileNotFoundException,

  InvalidJobConfException, IOException {

  //从JobTracker得到当前任务的id

  JobID jobId = jobSubmitClient.getNewJobId();

  //准备将任务运行所需要的要素写入HDFS:

  //任务运行程序所在的jar封装成job.jar

  //任务所要处理的input split信息写入job.split

  //任务运行的配置项汇总写入job.xml

  Path submitJobDir = new Path(getSystemDir(), jobId.toString());

  Path submitJarFile = new Path(submitJobDir, job.jar );

  Path submitSplitFile = new Path(submitJobDir, job.split );

  //此处将-libjars命令行指定的jar上传至HDFS

  configureCommandLineOptions(job, submitJobDir, submitJarFile);

  Path submitJobFile = new Path(submitJobDir, job.xml );

  ……

  //通过input format的格式获得相应的input split,默认类型为FileSplit

  InputSplit[] splits =

  job.getInputFormat().getSplits(job, job.getNumMapTasks());

 

  // 生成一个写入流,将input split得信息写入job.split文件

  FSDataOutputStream out = FileSystem.create(fs,

  submitSplitFile, new FsPermission(JOB_FILE_PERMISSION));

  try {

  //写入job.split文件的信息包括:split文件头,split文件版本号,split的个数,接着依次写入每一个input split的信息。

  //对于每一个input split写入:split类型名(默认FileSplit),split的大小,split的内容(对于FileSplit,写入文件名,此split在文件中的起始位置),split的location信息(即在那个DataNode上)。

  writeSplitsFile(splits, out);

  } finally {

  out.close();

  }

  job.set( mapred.job.split.file , submitSplitFile.toString());

  //根据split的个数设定map task的个数

  job.setNumMapTasks(splits.length);

  // 写入job的配置信息入job.xml文件 

  out = FileSystem.create(fs, submitJobFile,

  new FsPermission(JOB_FILE_PERMISSION));

  try {

  job.writeXml(out);

  } finally {

  out.close();

  }

  //真正的调用JobTracker来提交任务

  JobStatus status = jobSubmitClient.submitJob(jobId);

  ……

}


二、JobTracker

JobTracker作为一个单独的JVM运行,其运行的main函数主要调用有下面两部分:

调用静态函数startTracker(new JobConf())创建一个JobTracker对象 调用JobTracker.offerService()函数提供服务

在JobTracker的构造函数中,会生成一个taskScheduler成员变量,来进行Job的调度,默认为JobQueueTaskScheduler,也即按照FIFO的方式调度任务。

在offerService函数中,则调用taskScheduler.start(),在这个函数中,为JobTracker(也即taskScheduler的taskTrackerManager)注册了两个Listener:

JobQueueJobInProgressListener jobQueueJobInProgressListener用于监控job的运行状态 EagerTaskInitializationListener eagerTaskInitializationListener用于对Job进行初始化

EagerTaskInitializationListener中有一个线程JobInitThread,不断得到jobInitQueue中的JobInProgress对象,调用JobInProgress对象的initTasks函数对任务进行初始化操作。

在上一节中,客户端调用了JobTracker.submitJob函数,此函数首先生成一个JobInProgress对象,然后调用addJob函数,其中有如下的逻辑:


  synchronized (taskScheduler) {

  jobs.put(job.getProfile().getJobID(), job);

  //对JobTracker的每一个listener都调用jobAdded函数

  for (JobInProgressListener listener : jobInProgressListeners) {

  listener.jobAdded(job);

  }

  }

}


 

EagerTaskInitializationListener的jobAdded函数就是向jobInitQueue中添加一个JobInProgress对象,于是自然触发了此Job的初始化操作,由JobInProgress得initTasks函数完成:


  //从HDFS中读取job.split文件从而生成input splits

  String jobFile = profile.getJobFile();

  Path sysDir = new Path(this.jobtracker.getSystemDir());

  FileSystem fs = sysDir.getFileSystem(conf);

  DataInputStream splitFile =

  fs.open(new Path(conf.get( mapred.job.split.file )));

  JobClient.RawSplit[] splits;

  try {

  splits = JobClient.readSplitFile(splitFile);

  } finally {

  splitFile.close();

  }

  //map task的个数就是input split的个数

  numMapTasks = splits.length;

  //为每个map tasks生成一个TaskInProgress来处理一个input split

  maps = new TaskInProgress[numMapTasks];

  for(int i=0; i numMapTasks; ++i) {

  inputLength += splits[i].getDataLength();

  maps[i] = new TaskInProgress(jobId, jobFile,

  splits[i],

  jobtracker, conf, this, i);

  }

  //对于map task,将其放入nonRunningMapCache,是一个Map Node, List TaskInProgress ,也即对于map task来讲,其将会被分配到其input split所在的Node上。nonRunningMapCache将在JobTracker向TaskTracker分配map task的时候使用。

  if (numMapTasks 0) {
  nonRunningMapCache = createCache(splits, maxLevel);
  }

 

  //创建reduce task

  this.reduces = new TaskInProgress[numReduceTasks];

  for (int i = 0; i numReduceTasks; i++) {

  reduces[i] = new TaskInProgress(jobId, jobFile,

  numMapTasks, i,

  jobtracker, conf, this);

  //reduce task放入nonRunningReduces,其将在JobTracker向TaskTracker分配reduce task的时候使用。

  nonRunningReduces.add(reduces[i]);

  }

 

  //创建两个cleanup task,一个用来清理map,一个用来清理reduce.

  cleanup = new TaskInProgress[2];

  cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0],

  jobtracker, conf, this, numMapTasks);

  cleanup[0].setJobCleanupTask();

  cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

  numReduceTasks, jobtracker, conf, this);

  cleanup[1].setJobCleanupTask();

  //创建两个初始化 task,一个初始化map,一个初始化reduce.

  setup = new TaskInProgress[2];

  setup[0] = new TaskInProgress(jobId, jobFile, splits[0],

  jobtracker, conf, this, numMapTasks + 1 );

  setup[0].setJobSetupTask();

  setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

  numReduceTasks + 1, jobtracker, conf, this);

  setup[1].setJobSetupTask();

  tasksInited.set(true);//初始化完毕

  ……

}


三、TaskTracker

TaskTracker也是作为一个单独的JVM来运行的,在其main函数中,主要是调用了new TaskTracker(conf).run(),其中run函数主要调用了:


  //TaskTracker进行是一直存在的

  while (running !shuttingDown) {

  ……

  long now = System.currentTimeMillis();

  //每隔一段时间就向JobTracker发送heartbeat

  long waitTime = heartbeatInterval (now lastHeartbeat);

  if (waitTime 0) {

  synchronized(finishedCount) {

  if (finishedCount[0] == 0) {

  finishedCount.wait(waitTime);

  }

  finishedCount[0] = 0;

  }

  }

  ……

  //发送Heartbeat到JobTracker,得到response

  HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);

  ……

  //从Response中得到此TaskTracker需要做的事情

  TaskTrackerAction[] actions = heartbeatResponse.getActions();

  ……

  if (actions != null){

  for(TaskTrackerAction action: actions) {

  if (action instanceof LaunchTaskAction) {

  //如果是运行一个新的Task,则将Action添加到任务队列中

  addToTaskQueue((LaunchTaskAction)action);

  } else if (action instanceof CommitTaskAction) {

  CommitTaskAction commitAction = (CommitTaskAction)action;

  if (!commitResponses.contains(commitAction.getTaskID())) {

  commitResponses.add(commitAction.getTaskID());

  }

  } else {

  tasksToCleanup.put(action);

  }

  }

  }

  }

  return State.NORMAL;

}


private HeartbeatResponse transmitHeartBeat(long now) throws IOException {

  //每隔一段时间,在heartbeat中要返回给JobTracker一些统计信息

  boolean sendCounters;

  if (now (previousUpdate + COUNTER_UPDATE_INTERVAL)) {

  sendCounters = true;

  previousUpdate = now;

  }

  else {

  sendCounters = false;

  }

  ……

  //报告给JobTracker,此TaskTracker的当前状态

  if (status == null) {

  synchronized (this) {

  status = new TaskTrackerStatus(taskTrackerName, localHostname,

  httpPort,

  cloneAndResetRunningTaskStatuses(

  sendCounters),

  failures,

  maxCurrentMapTasks,

  maxCurrentReduceTasks);

  }

  }

  ……

  //当满足下面的条件的时候,此TaskTracker请求JobTracker为其分配一个新的Task来运行:

  //当前TaskTracker正在运行的map task的个数小于可以运行的map task的最大个数

  //当前TaskTracker正在运行的reduce task的个数小于可以运行的reduce task的最大个数

  boolean askForNewTask;

  long localMinSpaceStart;

  synchronized (this) {

  askForNewTask = (status.countMapTasks() maxCurrentMapTasks ||

  status.countReduceTasks() maxCurrentReduceTasks)

  acceptNewTasks;

  localMinSpaceStart = minSpaceStart;

  }

  ……

  //向JobTracker发送heartbeat,这是一个RPC调用

  HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,

  justStarted, askForNewTask,

  heartbeatResponseId);

  ……

  return heartbeatResponse;

}


四、JobTracker

当JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean initialContact, boolean acceptNewTasks, short responseId)函数被调用:


public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

  boolean initialContact, boolean acceptNewTasks, short responseId)

  throws IOException {

  ……

  String trackerName = status.getTrackerName();

  ……

  short newResponseId = (short)(responseId + 1);

  ……

  HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);

  List TaskTrackerAction actions = new ArrayList TaskTrackerAction

  //如果TaskTracker向JobTracker请求一个task运行

  if (acceptNewTasks) {

  TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);

  if (taskTrackerStatus == null) {

  LOG.warn( Unknown task tracker polling; ignoring: + trackerName);

  } else {

  //setup和cleanup的task优先级最高

  List Task tasks = getSetupAndCleanupTasks(taskTrackerStatus);

  if (tasks == null ) {

  //任务调度器分配任务

  tasks = taskScheduler.assignTasks(taskTrackerStatus);

  }

  if (tasks != null) {

  for (Task task : tasks) {

  //将任务放入actions列表,返回给TaskTracker

  expireLaunchingTasks.addNewTask(task.getTaskID());

  actions.add(new LaunchTaskAction(task));

  }

  }

  }

  }

  ……

  int nextInterval = getNextHeartbeatInterval();

  response.setHeartbeatInterval(nextInterval);

  response.setActions(

  actions.toArray(new TaskTrackerAction[actions.size()]));

  ……

  return response;

}


public synchronized List Task assignTasks(TaskTrackerStatus taskTracker)

  throws IOException {

  ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();

  int numTaskTrackers = clusterStatus.getTaskTrackers();

  Collection JobInProgress jobQueue = jobQueueJobInProgressListener.getJobQueue();

  int maxCurrentMapTasks = taskTracker.getMaxMapTasks();

  int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();

  int numMaps = taskTracker.countMapTasks();

  int numReduces = taskTracker.countReduceTasks();

  //计算剩余的map和reduce的工作量:remaining

  int remainingReduceLoad = 0;

  int remainingMapLoad = 0;

  synchronized (jobQueue) {

  for (JobInProgress job : jobQueue) {

  if (job.getStatus().getRunState() == JobStatus.RUNNING) {

  int totalMapTasks = job.desiredMaps();

  int totalReduceTasks = job.desiredReduces();

  remainingMapLoad += (totalMapTasks job.finishedMaps());

  remainingReduceLoad += (totalReduceTasks job.finishedReduces());

  }

  }

  }

  //计算平均每个TaskTracker应有的工作量,remaining/numTaskTrackers是剩余的工作量除以TaskTracker的个数。

  int maxMapLoad = 0;

  int maxReduceLoad = 0;

  if (numTaskTrackers 0) {

  maxMapLoad = Math.min(maxCurrentMapTasks,

  (int) Math.ceil((double) remainingMapLoad /

  numTaskTrackers));

  maxReduceLoad = Math.min(maxCurrentReduceTasks,

  (int) Math.ceil((double) remainingReduceLoad

  / numTaskTrackers));

  }

  ……

 

  //map优先于reduce,当TaskTracker上运行的map task数目小于平均的工作量,则向其分配map task

  if (numMaps maxMapLoad) {

  int totalNeededMaps = 0;

  synchronized (jobQueue) {

  for (JobInProgress job : jobQueue) {

  if (job.getStatus().getRunState() != JobStatus.RUNNING) {

  continue;

  }

  Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,

  taskTrackerManager.getNumberOfUniqueHosts());

  if (t != null) {

  return Collections.singletonList(t);

  }

  ……

  }

  }

  }

  //分配完map task,再分配reduce task

  if (numReduces maxReduceLoad) {

  int totalNeededReduces = 0;

  synchronized (jobQueue) {

  for (JobInProgress job : jobQueue) {

  if (job.getStatus().getRunState() != JobStatus.RUNNING ||

  job.numReduceTasks == 0) {

  continue;

  }

  Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers,

  taskTrackerManager.getNumberOfUniqueHosts());

  if (t != null) {

  return Collections.singletonList(t);

  }

  ……

  }

  }

  }

  return null;

}


从上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找TaskInProgress。JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。

 

五、TaskTracker

在向JobTracker发送heartbeat后,返回的reponse中有分配好的任务LaunchTaskAction,将其加入队列,调用addToTaskQueue,如果是map task则放入mapLancher(类型为TaskLauncher),如果是reduce task则放入reduceLancher(类型为TaskLauncher):


private void addToTaskQueue(LaunchTaskAction action) {

  if (action.getTask().isMapTask()) {

  mapLauncher.addToTaskQueue(action);

  } else {

  reduceLauncher.addToTaskQueue(action);

  }

}


TaskLauncher是一个线程,其run函数从上面放入的queue中取出一个TaskInProgress,然后调用startNewTask(TaskInProgress tip)来启动一个task,其又主要调用了localizeJob(TaskInProgress tip):


private void localizeJob(TaskInProgress tip) throws IOException {

  //首先要做的一件事情是有关Task的文件从HDFS拷贝的TaskTracker的本地文件系统中:job.split,job.xml以及job.jar

  Path localJarFile = null;

  Task t = tip.getTask();

  JobID jobId = t.getJobID();

  Path jobFile = new Path(t.getJobFile());

  ……

  Path localJobFile = lDirAlloc.getLocalPathForWrite(

  getLocalJobDir(jobId.toString())

  + Path.SEPARATOR + job.xml ,

  jobFileSize, fConf);

  RunningJob rjob = addTaskToJob(jobId, tip);

  synchronized (rjob) {

  if (!rjob.localized) {

  FileSystem localFs = FileSystem.getLocal(fConf);

  Path jobDir = localJobFile.getParent();

  ……

  //将job.split拷贝到本地

  systemFS.copyToLocalFile(jobFile, localJobFile);

  JobConf localJobConf = new JobConf(localJobFile);

  Path workDir = lDirAlloc.getLocalPathForWrite(

  (getLocalJobDir(jobId.toString())

  + Path.SEPARATOR + work ), fConf);

  if (!localFs.mkdirs(workDir)) {

  throw new IOException( Mkdirs failed to create

  + workDir.toString());

  }

  System.setProperty( job.local.dir , workDir.toString());

  localJobConf.set( job.local.dir , workDir.toString());

  // copy Jar file to the local FS and unjar it.

  String jarFile = localJobConf.getJar();

  long jarFileSize = -1;

  if (jarFile != null) {

  Path jarFilePath = new Path(jarFile);

  localJarFile = new Path(lDirAlloc.getLocalPathForWrite(

  getLocalJobDir(jobId.toString())

  + Path.SEPARATOR + jars ,

  5 * jarFileSize, fConf), job.jar );

  if (!localFs.mkdirs(localJarFile.getParent())) {

  throw new IOException( Mkdirs failed to create jars directory );

  }

  //将job.jar拷贝到本地

  systemFS.copyToLocalFile(jarFilePath, localJarFile);

  localJobConf.setJar(localJarFile.toString());

  //将job得configuration写成job.xml

  OutputStream out = localFs.create(localJobFile);

  try {

  localJobConf.writeXml(out);

  } finally {

  out.close();

  }

  // 解压缩job.jar

  RunJar.unJar(new File(localJarFile.toString()),

  new File(localJarFile.getParent().toString()));

  }

  rjob.localized = true;

  rjob.jobConf = localJobConf;

  }

  }

  //真正的启动此Task

  launchTaskForJob(tip, new JobConf(rjob.jobConf));

}


当所有的task运行所需要的资源都拷贝到本地后,则调用launchTaskForJob,其又调用TaskInProgress的launchTask函数:


  localizeTask(task);

  if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {

  this.taskStatus.setRunState(TaskStatus.State.RUNNING);

  }

  //创建并启动TaskRunner,对于MapTask,创建的是MapTaskRunner,对于ReduceTask,创建的是ReduceTaskRunner

  this.runner = task.createRunner(TaskTracker.this, this);

  this.runner.start();

  this.taskStatus.setStartTime(System.currentTimeMillis());

}


  TaskAttemptID taskid = t.getTaskID();

  LocalDirAllocator lDirAlloc = new LocalDirAllocator( mapred.local.dir );

  File jobCacheDir = null;

  if (conf.getJar() != null) {

  jobCacheDir = new File(

  new Path(conf.getJar()).getParent().toString());

  }

  File workDir = new File(lDirAlloc.getLocalPathToRead(

  TaskTracker.getLocalTaskDir(

  t.getJobID().toString(),

  t.getTaskID().toString(),

  t.isTaskCleanupTask())

  + Path.SEPARATOR + MRConstants.WORKDIR,

  conf). toString());

  FileSystem fileSystem;

  Path localPath;

  ……

  //拼写classpath

  String baseDir;

  String sep = System.getProperty( path.separator );

  StringBuffer classPath = new StringBuffer();

  // start with same classpath as parent process

  classPath.append(System.getProperty( java.class.path ));

  classPath.append(sep);

  if (!workDir.mkdirs()) {

  if (!workDir.isDirectory()) {

  LOG.fatal( Mkdirs failed to create + workDir.toString());

  }

  }

  String jar = conf.getJar();

  if (jar != null) { 

  // if jar exists, it into workDir

  File[] libs = new File(jobCacheDir, lib ).listFiles();

  if (libs != null) {

  for (int i = 0; i libs.length; i++) {

  classPath.append(sep);  // add libs from jar to classpath

  classPath.append(libs[i]);

  }

  }

  classPath.append(sep);

  classPath.append(new File(jobCacheDir, classes ));

  classPath.append(sep);

  classPath.append(jobCacheDir);

  }

  ……

  classPath.append(sep);

  classPath.append(workDir);

  //拼写命令行java及其参数

  Vector String vargs = new Vector String

  File jvm =

  new File(new File(System.getProperty( java.home ), bin ), java );

  vargs.add(jvm.toString());

  String javaOpts = conf.get( mapred.child.java.opts , -Xmx200m );

  javaOpts = javaOpts.replace( @taskid@ , taskid.toString());

  String [] javaOptsSplit = javaOpts.split( );

  String libraryPath = System.getProperty( java.library.path );

  if (libraryPath == null) {

  libraryPath = workDir.getAbsolutePath();

  } else {

  libraryPath += sep + workDir;

  }

  boolean hasUserLDPath = false;

  for(int i=0; i javaOptsSplit.length ;i++) {

  if(javaOptsSplit[i].startsWith( -Djava.library.path= )) {

  javaOptsSplit[i] += sep + libraryPath;

  hasUserLDPath = true;

  break;

  }

  }

  if(!hasUserLDPath) {

  vargs.add( -Djava.library.path= + libraryPath);

  }

  for (int i = 0; i javaOptsSplit.length; i++) {

  vargs.add(javaOptsSplit[i]);

  }

  //添加Child进程的临时文件夹

  String tmp = conf.get( mapred.child.tmp , ./tmp );

  Path tmpDir = new Path(tmp);

  if (!tmpDir.isAbsolute()) {

  tmpDir = new Path(workDir.toString(), tmp);

  }

  FileSystem localFs = FileSystem.getLocal(conf);

  if (!localFs.mkdirs(tmpDir) !localFs.getFileStatus(tmpDir).isDir()) {

  throw new IOException( Mkdirs failed to create + tmpDir.toString());

  }

  vargs.add( -Djava.io.tmpdir= + tmpDir.toString());

  // Add classpath.

  vargs.add( -classpath );

  vargs.add(classPath.toString());

  //log文件夹

  long logSize = TaskLog.getTaskLogLength(conf);

  vargs.add( -Dhadoop.log.dir= +

  new File(System.getProperty( hadoop.log.dir )

  ).getAbsolutePath());

  vargs.add( -Dhadoop.root.logger=INFO,TLA );

  vargs.add( -Dhadoop.tasklog.taskid= + taskid);

  vargs.add( -Dhadoop.tasklog.totalLogFileSize= + logSize);

  // 运行map task和reduce task的子进程的main class是Child

  vargs.add(Child.class.getName());  // main of Child

  ……

  //运行子进程

  jvmManager.launchJvm(this,

  jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,

  workDir, env, pidFile, conf));

}


六、Child

真正的map task和reduce task都是在Child进程中运行的,Child的main函数的主要逻辑如下:


  //从TaskTracker通过网络通信得到JvmTask对象

  JvmTask myTask = umbilical.getTask(jvmId);

  ……

  idleLoopCount = 0;

  task = myTask.getTask();

  taskid = task.getTaskID();

  isCleanup = task.isTaskCleanupTask();

  JobConf job = new JobConf(task.getJobFile());

  TaskRunner.setupWorkDir(job);

  numTasksToExecute = job.getNumTasksToExecutePerJvm();

  task.setConf(job);

  defaultConf.addResource(new Path(task.getJobFile()));

  ……

  //运行task

  task.run(job, umbilical);  // run the task

  if (numTasksToExecute 0 ++numTasksExecuted == numTasksToExecute) {

  break;

  }

}


public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)

  throws IOException {

  //用于同TaskTracker进行通信,汇报运行状况

  final Reporter reporter = getReporter(umbilical);

  startCommunicationThread(umbilical);

  initialize(job, reporter);

  ……

  //map task的输出

  int numReduceTasks = conf.getNumReduceTasks();

  MapOutputCollector collector = null;

  if (numReduceTasks 0) {

  collector = new MapOutputBuffer(umbilical, job, reporter);

  } else {

  collector = new DirectMapOutputCollector(umbilical, job, reporter);

  }

  //读取input split,按照其中的信息,生成RecordReader来读取数据

instantiatedSplit = (InputSplit)

  ReflectionUtils.newInstance(job.getClassByName(splitClass), job);

  DataInputBuffer splitBuffer = new DataInputBuffer();

  splitBuffer.reset(split.getBytes(), 0, split.getLength());

  instantiatedSplit.readFields(splitBuffer);

  if (instantiatedSplit instanceof FileSplit) {

  FileSplit fileSplit = (FileSplit) instantiatedSplit;

  job.set( map.input.file , fileSplit.getPath().toString());

  job.setLong( map.input.start , fileSplit.getStart());

  job.setLong( map.input.length , fileSplit.getLength());

  }

  RecordReader rawIn =  // open input

  job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);

  RecordReader in = isSkipping() ?

  new SkippingRecordReader(rawIn, getCounters(), umbilical) :

  new TrackedRecordReader(rawIn, getCounters());

  job.setBoolean( mapred.skip.on , isSkipping());

  //对于map task,生成一个MapRunnable,默认是MapRunner

  MapRunnable runner =

  ReflectionUtils.newInstance(job.getMapRunnerClass(), job);

  try {

  //MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理。

  runner.run(in, collector, reporter); 

  collector.flush();

  } finally {

  in.close();  // close input

  collector.close();

  }

  done(umbilical);

}


MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理:


public void run(RecordReader K1, V1 input, OutputCollector K2, V2 output,

  Reporter reporter)

  throws IOException {

  try {

  K1 key = input.createKey();

  V1 value = input.createValue();

  while (input.next(key, value)) {

  mapper.map(key, value, output, reporter);

  if(incrProcCount) {

  reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,

  SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);

  }

  }

  } finally {

  mapper.close();

  }

}


public synchronized void collect(K key, V value)

  throws IOException {

  reporter.progress();

  ……

  //从此处看,此buffer是一个ring的数据结构

  final int kvnext = (kvindex + 1) % kvoffsets.length;

  spillLock.lock();

  try {

  boolean kvfull;

  do {

  //在ring中,如果下一个空闲位置接上起始位置的话,则表示满了

  kvfull = kvnext == kvstart;

  //在ring中计算是否需要将buffer写入硬盘的阈值

  final boolean kvsoftlimit = ((kvnext kvend)

  ? kvnext kvend softRecordLimit

  : kvend kvnext = kvoffsets.length softRecordLimit);

  //如果到达阈值,则开始将buffer写入硬盘,写成spill文件。

  //startSpill主要是notify一个背后线程SpillThread的run()函数,开始调用sortAndSpill()开始排序,合并,写入硬盘

  if (kvstart == kvend kvsoftlimit) {

  startSpill();

  }

  //如果buffer满了,则只能等待写入完毕

  if (kvfull) {

  while (kvstart != kvend) {

  reporter.progress();

  spillDone.await();

  }

  }

  } while (kvfull);

  } finally {

  spillLock.unlock();

  }

  try {

  //如果buffer不满,则将key, value写入buffer

  int keystart = bufindex;

  keySerializer.serialize(key);

  final int valstart = bufindex;

  valSerializer.serialize(value);

  int valend = bb.markRecord();

  //调用设定的partitioner,根据key, value取得partition id

  final int partition = partitioner.getPartition(key, value, partitions);

  mapOutputRecordCounter.increment(1);

  mapOutputByteCounter.increment(valend = keystart

  ? valend keystart

  : (bufvoid keystart) + valend);

  //将parition id以及key, value在buffer中的偏移量写入索引数组

  int ind = kvindex * ACCTSIZE;

  kvoffsets[kvindex] = ind;

  kvindices[ind + PARTITION] = partition;

  kvindices[ind + KEYSTART] = keystart;

  kvindices[ind + VALSTART] = valstart;

  kvindex = kvnext;

  } catch (MapBufferTooSmallException e) {

  LOG.info( Record too large for in-memory buffer: + e.getMessage());

  spillSingleRecord(key, value);

  mapOutputRecordCounter.increment(1);

  return;

  }

}


  FSDataOutputStream out = null;

  FSDataOutputStream indexOut = null;

  IFileOutputStream indexChecksumOut = null;

  //创建硬盘上的spill文件

  Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),

  numSpills, size);

  out = rfs.create(filename);

  ……

  final int endPosition = (kvend kvstart)

  ? kvend

  : kvoffsets.length + kvend;

  //按照partition的顺序对buffer中的数据进行排序

  sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);

  int spindex = kvstart;

  InMemValBytes value = new InMemValBytes();

  //依次一个一个parition的写入文件

  for (int i = 0; i partitions; ++i) {

  IFile.Writer K, V writer = null;

  long segmentStart = out.getPos();

  writer = new Writer K, V (job, out, keyClass, valClass, codec);

  //如果combiner为空,则直接写入文件

  if (null == combinerClass) {

  ……

  writer.append(key, value);

  ++spindex;

  }

  else {

  ……

  //如果combiner不为空,则先combine,调用combiner.reduce(…)函数后再写入文件

  combineAndSpill(kvIter, combineInputCounter);

  }

  }

  ……

}


当map阶段结束的时候,MapOutputBuffer的flush函数会被调用,其也会调用sortAndSpill将buffer中的写入文件,然后再调用mergeParts来合并写入在硬盘上的多个spill:


  //对于每一个partition

  for (int parts = 0; parts partitions; parts++){

  //create the segments to be merged

  List Segment K, V segmentList =

  new ArrayList Segment K, V (numSpills);

  TaskAttemptID mapId = getTaskID();

  //依次从各个spill文件中收集属于当前partition的段

  for(int i = 0; i numSpills; i++) {

  final IndexRecord indexRecord =

  getIndexInformation(mapId, i, parts);

  long segmentOffset = indexRecord.startOffset;

  long segmentLength = indexRecord.partLength;

  Segment K, V s =

  new Segment K, V (job, rfs, filename[i], segmentOffset,

  segmentLength, codec, true);

  segmentList.add(i, s);

  }

  //将属于同一个partition的段merge到一起

  RawKeyValueIterator kvIter =

  Merger.merge(job, rfs,

  keyClass, valClass,

  segmentList, job.getInt( io.sort.factor , 100),

  new Path(getTaskID().toString()),

  job.getOutputKeyComparator(), reporter);

  //写入合并后的段到文件

  long segmentStart = finalOut.getPos();

  Writer K, V writer =

  new Writer K, V (job, finalOut, keyClass, valClass, codec);

  if (null == combinerClass || numSpills minSpillsForCombine) {

  Merger.writeFile(kvIter, writer, reporter, job);

  } else {

  combineCollector.setWriter(writer);

  combineAndSpill(kvIter, combineInputCounter);

  }

  ……

  }

}


public void run(JobConf job, final TaskUmbilicalProtocol umbilical)

  throws IOException {

  job.setBoolean( mapred.skip.on , isSkipping());

  //对于reduce,则包含三个步骤:拷贝,排序,Reduce

  if (isMapOrReduce()) {

  copyPhase = getProgress().addPhase( copy );

  sortPhase  = getProgress().addPhase( sort );

  reducePhase = getProgress().addPhase( reduce );

  }

  startCommunicationThread(umbilical);

  final Reporter reporter = getReporter(umbilical);

  initialize(job, reporter);

  //copy阶段,主要使用ReduceCopier的fetchOutputs函数获得map的输出。创建多个线程MapOutputCopier,其中copyOutput进行拷贝。

  boolean isLocal = local .equals(job.get( mapred.job.tracker , local ));

  if (!isLocal) {

  reduceCopier = new ReduceCopier(umbilical, job);

  if (!reduceCopier.fetchOutputs()) {

  ……

  }

  }

  copyPhase.complete();

  //sort阶段,将得到的map输出合并,直到文件数小于io.sort.factor时停止,返回一个Iterator用于访问key-value

  setPhase(TaskStatus.Phase.SORT);

  statusUpdate(umbilical);

  final FileSystem rfs = FileSystem.getLocal(job).getRaw();

  RawKeyValueIterator rIter = isLocal

  ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),

  job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),

  !conf.getKeepFailedTaskFiles(), job.getInt( io.sort.factor , 100),

  new Path(getTaskID().toString()), job.getOutputKeyComparator(),

  reporter)

  : reduceCopier.createKVIterator(job, rfs, reporter);

  mapOutputFilesOnDisk.clear();

  sortPhase.complete();

  //reduce阶段

  setPhase(TaskStatus.Phase.REDUCE);

  ……

  Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);

  Class keyClass = job.getMapOutputKeyClass();

  Class valClass = job.getMapOutputValueClass();

  ReduceValuesIterator values = isSkipping() ?

  new SkippingReduceValuesIterator(rIter,

  job.getOutputValueGroupingComparator(), keyClass, valClass,

  job, reporter, umbilical) :

  new ReduceValuesIterator(rIter,

  job.getOutputValueGroupingComparator(), keyClass, valClass,

  job, reporter);

  //逐个读出key-value list,然后调用Reducer的reduce函数

  while (values.more()) {

  reduceInputKeyCounter.increment(1);

  reducer.reduce(values.getKey(), values, collector, reporter);

  values.nextKey();

  values.informReduceProgress();

  }

  reducer.close();

  out.close(reporter);

  done(umbilical);

}