前一篇文章解答了Map任务数、启动人的细节,下面我们解答第二个问题: HDFS的block是否粗暴但忠实的将文件按照64MB分片呢?如果是的话,怎么保证Map获取到的Splits是正确的?具体到wordcount,MR是怎么处理一个单词跨block的情况呢?

我们从Map任务的人口开始说起。前面YARN分析的时候有提到过,AppMaster会将task提交到NodeManager,在NM的container里运行具体的任务。具体到MR来说,运行的任务就是MapTask/ReduceTask。 来看MapTask的runNewMapper:

  void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) 
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)	//定义inputFormat
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
..
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);	//源自inputFormat
..
    mapContext = 
      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
          input, output, 	//注意input
          committer, 
          reporter, split);

    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
        mapperContext = 
          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
              mapContext);

    try {
      input.initialize(split, mapperContext);	//先init
      mapper.run(mapperContext);				//再run

runNewMapper会先new一个mapContext,然后封装为mapperContext,并将这个context传递给mapper的run方法。显然这里只是封装上下文,并不会处理跨分片。继续来看Mapper类的run方法。 用户会继承Mapper类,实现自己的setup和map方法,而run方法通常直接用Mapper的。来看Map框架的run方法:

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
..
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {		//关键点
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    }
  }

简单来说就是context不断的nextKeyValue,得到了KV交给用户自定义的map方法。那么解决问题的关键就在nextKV了。 Mapper的run方法里用的是Context,具体nextKeyValue是在哪个类里定义的,还得回去MapTask里找MapContextImpl

public class MapContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
..
  public MapContextImpl(Configuration conf, TaskAttemptID taskid,
                        RecordReader<KEYIN,VALUEIN> reader,
                        RecordWriter<KEYOUT,VALUEOUT> writer,
                        OutputCommitter committer,
                        StatusReporter reporter,
                        InputSplit split) {
    super(conf, taskid, writer, committer, reporter);
    this.reader = reader;	//reader
    this.split = split;
  }

  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
    return reader.nextKeyValue();	//调用reader的nextKeyValue
  }

MapTask在创建上下文的时候记录reader类型,等到Mapper.run调用上下文的nextKeyValue的时候,实际调用的是reader的nextKV。 那么reader是谁呢?回到runNewMapper方法,reader其实就是input,而input的类型就是解析输入文件的后缀名得到的;在wordcount示例里,输入是纯文本文件,实际就是TextInputFormat。 runNewMapperNewTrackingRecordReader调用了TextInputFormatcreateRecordReader,最终创建了LineRecordReader对象。

答案就在LineRecordReader里。来看代码:

public class LineRecordReader implements RecordReader<LongWritable, Text> {
  public LineRecordReader(Configuration job, FileSplit split,	//record初始化方法
      byte[] recordDelimiter) throws IOException {
...
  public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
...
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {		//只要不是第一个分片,总是跳过第一行,因为前面的block处理的时候,已经越过分区读取完毕了
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;		//记录本分split的起始位置
  }
...
  public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {	//保证到了split末尾时只会一次“超读”
      if (pos == 0) {
        newSize = skipUtfByteOrderMark();
      } else {
        newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));	//答案:超读
        pos += newSize;
      }
      ...
    }

还记得刚开始的runNewMapper里是怎么处理的吗?对,先initialize,再run;run里不停的nextKeyValue。 具体到LineRecordReader,initialize的处理是,如果当前块不是首块,那么就会跳过第一行(Split的划分其实是逻辑上的,只是指定了该文件的start和end位置,而不是真实的划分成小文件),因为第一行已经在前面的块里处理了; 相应的,在NextKeyValue里,由于使用的是readLine,故而总是会读完该文件的一整行(而不是该split),如果是该行跨HDFS分区,那么就读取下一个分区。

答案其实很简单,不过借着这个问题,梳理了下MapTask的流程,虽然有点琐碎,但还是有所收获。 若分析谬误,还请指出:)

参考: Hadoop MapReduce中如何处理跨行Block和InputSplit