MapReduce具体问题(二)
by 伊布
前一篇文章解答了Map任务数、启动人的细节,下面我们解答第二个问题: HDFS的block是否粗暴但忠实的将文件按照64MB分片呢?如果是的话,怎么保证Map获取到的Splits是正确的?具体到wordcount,MR是怎么处理一个单词跨block的情况呢?
我们从Map任务的人口开始说起。前面YARN分析的时候有提到过,AppMaster会将task提交到NodeManager,在NM的container里运行具体的任务。具体到MR来说,运行的任务就是MapTask/ReduceTask。 来看MapTask的runNewMapper:
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
)
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) //定义inputFormat
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
..
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext); //源自inputFormat
..
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output, //注意input
committer,
reporter, split);
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);
try {
input.initialize(split, mapperContext); //先init
mapper.run(mapperContext); //再run
runNewMapper会先new一个mapContext,然后封装为mapperContext,并将这个context传递给mapper的run方法。显然这里只是封装上下文,并不会处理跨分片。继续来看Mapper类的run方法。 用户会继承Mapper类,实现自己的setup和map方法,而run方法通常直接用Mapper的。来看Map框架的run方法:
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
..
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) { //关键点
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
}
}
简单来说就是context不断的nextKeyValue
,得到了KV交给用户自定义的map方法。那么解决问题的关键就在nextKV了。
Mapper的run方法里用的是Context,具体nextKeyValue是在哪个类里定义的,还得回去MapTask里找MapContextImpl
。
public class MapContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
..
public MapContextImpl(Configuration conf, TaskAttemptID taskid,
RecordReader<KEYIN,VALUEIN> reader,
RecordWriter<KEYOUT,VALUEOUT> writer,
OutputCommitter committer,
StatusReporter reporter,
InputSplit split) {
super(conf, taskid, writer, committer, reporter);
this.reader = reader; //reader
this.split = split;
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return reader.nextKeyValue(); //调用reader的nextKeyValue
}
MapTask在创建上下文的时候记录reader类型,等到Mapper.run调用上下文的nextKeyValue的时候,实际调用的是reader的nextKV。
那么reader是谁呢?回到runNewMapper
方法,reader其实就是input,而input的类型就是解析输入文件的后缀名得到的;在wordcount示例里,输入是纯文本文件,实际就是TextInputFormat。
runNewMapper
的NewTrackingRecordReader
调用了TextInputFormat
的createRecordReader
,最终创建了LineRecordReader
对象。
答案就在LineRecordReader里。来看代码:
public class LineRecordReader implements RecordReader<LongWritable, Text> {
public LineRecordReader(Configuration job, FileSplit split, //record初始化方法
byte[] recordDelimiter) throws IOException {
...
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
...
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) { //只要不是第一个分片,总是跳过第一行,因为前面的block处理的时候,已经越过分区读取完毕了
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start; //记录本分split的起始位置
}
...
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { //保证到了split末尾时只会一次“超读”
if (pos == 0) {
newSize = skipUtfByteOrderMark();
} else {
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); //答案:超读
pos += newSize;
}
...
}
还记得刚开始的runNewMapper
里是怎么处理的吗?对,先initialize,再run;run里不停的nextKeyValue。
具体到LineRecordReader,initialize的处理是,如果当前块不是首块,那么就会跳过第一行(Split的划分其实是逻辑上的,只是指定了该文件的start和end位置,而不是真实的划分成小文件),因为第一行已经在前面的块里处理了;
相应的,在NextKeyValue里,由于使用的是readLine,故而总是会读完该文件的一整行(而不是该split),如果是该行跨HDFS分区,那么就读取下一个分区。
答案其实很简单,不过借着这个问题,梳理了下MapTask的流程,虽然有点琐碎,但还是有所收获。 若分析谬误,还请指出:)
Subscribe via RSS