MapReduce比较基础,但是经常会有一些问题不是很清楚,这一系列文章会解答几个经常问的问题。 本文解答第一个问题:是谁决定要起几个Map任务?在什么阶段呢?

还是以wordcount为例。 wordcount客户端在初始化job后调用Job.waitForCompletion方法就结束了,真正提交Job给Yarn的在MapReduce客户端代码的submit里做的。

  public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }

  public void submit()
        return submitter.submitJobInternal(Job.this, cluster);

  JobStatus submitJobInternal(Job job, Cluster cluster)
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      int maps = writeSplits(job, submitJobDir);
      conf.setInt(MRJobConfig.NUM_MAPS, maps);		//确定MR任务数
      LOG.info("number of splits:" + maps);

从上面的代码不难看出,Map任务的个数是客户端提交任务到YARN之前就决定了的,其个数由writeSplits结果决定。 那么Splits是什么?跟HDFS有什么关系呢?

我们知道,HDFS存储大文件的时候,将文件按照每64MB(或者128MB,可以配置)分为若干个块(Block),每个Block又分了若干个副本,分别存储到不同的datanode上去。而在做MapReduce的时候,我们希望能够获得比较好的数据本地性,也就是说计算任务跟它需要的数据,最好在一个节点上,从而可以显著的减少数据在网络上的传输。 结合上面两点,不难得出一个结论,Map任务的个数(计算任务),实际应该是由MR处理的文件在HDFS上存储的块的个数决定的。

回过头来分析writeSplits,它调用了writeNewSplits。

class JobSubmitter {
  private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =		//先获取input的格式
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

    List<InputSplit> splits = input.getSplits(job);		//根据格式获取分片数

public abstract class InputFormat<K, V> {
  public abstract
    List<InputSplit> getSplits(JobContext context
                               ) throws IOException, InterruptedException;

writeNewSplits先获取input的格式(其实就是根据文件后缀名来判断的,例如gz, bzip2等等),然后再根据对应的格式对象里的getSplits来获取分片数。本例里wordcount要处理的就是一个“纯文本文件”,对应的FileInputFormat是InputFormat的一种。 下面我们以FileInputFormat来看getSplits是怎么获取分片数的。

public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
  protected boolean isSplitable(JobContext context, Path filename) {
    return true;
  }
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));	//最小值,1
    long maxSize = getMaxSplitSize(job);	//最大值,Long.MAX_VALUE
..
        if (isSplitable(job, path)) {		//如果输入可分片
          long blockSize = file.getBlockSize();		//HDFS的block大小
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),		//注意getHosts
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {	//最后一个小分片
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable		//不可分片的输入
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }

注意getSplits时有个“可分片”的概念,也就是isSplitable。分片的意思是,对于单独一个分片,可以独立的拿给对应的Map任务处理,分片之间无依赖。 是否所有的Input都是可以分片的呢?对于纯文本来说,从任何一个位置都是可以直接读取的,显然是可分片的;但是对于gz压缩的文件来说,如果我们只拿到了一个分片,是没办法只解压这一个分片的(因为gz压缩是将压缩信息存储到所有分片的,只有一个分片不能得到所有的压缩信息,无法解压)。hadoop的做法其实就是根据文件后缀名来查找codec,然后检查是否codec instanceof SplittableCompressionCodec;。isSplitable是可以@override的。 可分片的Input处理方法:

  • 确定分片大小。通过file.getBlockSize()获取HDFS的block大小,然后跟MR自己定义的最大最小值比较来选择一个中间值。MR的最大最小值默认是Long.MAX_VALUE/1,所在wordcount里确定的split大小实际就是HDFS的blck大小。
  • 根据文件的总大小,先分整个split大小的,剩余的单独一个分片。每个分片都add到splits数组,add时会获取该block所在的节点地址(有缓存的时候优先使用缓存节点),提交Job后在分配Map任务时会参考这个节点地址,这样就可以将Map任务分配到数据所在的节点上(数据本地)。

对于不可分片的Input处理方法: 只add一次,节点就是第一个分片的节点地址。所以对于不可分片的文件,其实是损失了MR的并行运行,多个分片也需要跨节点拷贝,效率比较低。

具体到wordcount,由于我们输入的文件是纯文本的,所以可以分片;而客户端会根据HDFS各个分片所在的节点地址分别添加到待启动的各个MR任务上,任务总数会写到MRJobConfig.NUM_MAPS,提交了以后使用。

至此,我们明确了Map任务个数的确定者以及其确定阶段。 下期预告:第二个问题也是大家经常问的一个,HDFS的block是否粗暴但忠实的将文件按照64MB分片呢?如果是的话,怎么保证Map获取到的Splits是正确的?具体到wordcount,MR是怎么处理一个单词跨block的情况呢?