MapReduce具体问题(一)
by 伊布
MapReduce比较基础,但是经常会有一些问题不是很清楚,这一系列文章会解答几个经常问的问题。 本文解答第一个问题:是谁决定要起几个Map任务?在什么阶段呢?
还是以wordcount为例。 wordcount客户端在初始化job后调用Job.waitForCompletion方法就结束了,真正提交Job给Yarn的在MapReduce客户端代码的submit里做的。
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
public void submit()
return submitter.submitJobInternal(Job.this, cluster);
JobStatus submitJobInternal(Job job, Cluster cluster)
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps); //确定MR任务数
LOG.info("number of splits:" + maps);
从上面的代码不难看出,Map任务的个数是客户端提交任务到YARN之前就决定了的,其个数由writeSplits结果决定。 那么Splits是什么?跟HDFS有什么关系呢?
我们知道,HDFS存储大文件的时候,将文件按照每64MB(或者128MB,可以配置)分为若干个块(Block),每个Block又分了若干个副本,分别存储到不同的datanode上去。而在做MapReduce的时候,我们希望能够获得比较好的数据本地性,也就是说计算任务跟它需要的数据,最好在一个节点上,从而可以显著的减少数据在网络上的传输。 结合上面两点,不难得出一个结论,Map任务的个数(计算任务),实际应该是由MR处理的文件在HDFS上存储的块的个数决定的。
回过头来分析writeSplits,它调用了writeNewSplits。
class JobSubmitter {
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input = //先获取input的格式
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job); //根据格式获取分片数
public abstract class InputFormat<K, V> {
public abstract
List<InputSplit> getSplits(JobContext context
) throws IOException, InterruptedException;
writeNewSplits先获取input的格式(其实就是根据文件后缀名来判断的,例如gz, bzip2等等),然后再根据对应的格式对象里的getSplits来获取分片数。本例里wordcount要处理的就是一个“纯文本文件”,对应的FileInputFormat是InputFormat的一种。 下面我们以FileInputFormat来看getSplits是怎么获取分片数的。
public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
protected boolean isSplitable(JobContext context, Path filename) {
return true;
}
public List<InputSplit> getSplits(JobContext job) throws IOException {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //最小值,1
long maxSize = getMaxSplitSize(job); //最大值,Long.MAX_VALUE
..
if (isSplitable(job, path)) { //如果输入可分片
long blockSize = file.getBlockSize(); //HDFS的block大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(), //注意getHosts
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) { //最后一个小分片
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable //不可分片的输入
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
注意getSplits时有个“可分片”的概念,也就是isSplitable。分片的意思是,对于单独一个分片,可以独立的拿给对应的Map任务处理,分片之间无依赖。
是否所有的Input都是可以分片的呢?对于纯文本来说,从任何一个位置都是可以直接读取的,显然是可分片的;但是对于gz压缩的文件来说,如果我们只拿到了一个分片,是没办法只解压这一个分片的(因为gz压缩是将压缩信息存储到所有分片的,只有一个分片不能得到所有的压缩信息,无法解压)。hadoop的做法其实就是根据文件后缀名来查找codec,然后检查是否codec instanceof SplittableCompressionCodec;
。isSplitable是可以@override的。
可分片的Input处理方法:
- 确定分片大小。通过
file.getBlockSize()
获取HDFS的block大小,然后跟MR自己定义的最大最小值比较来选择一个中间值。MR的最大最小值默认是Long.MAX_VALUE/1,所在wordcount里确定的split大小实际就是HDFS的blck大小。 - 根据文件的总大小,先分整个split大小的,剩余的单独一个分片。每个分片都add到splits数组,add时会获取该block所在的节点地址(有缓存的时候优先使用缓存节点),提交Job后在分配Map任务时会参考这个节点地址,这样就可以将Map任务分配到数据所在的节点上(数据本地)。
对于不可分片的Input处理方法: 只add一次,节点就是第一个分片的节点地址。所以对于不可分片的文件,其实是损失了MR的并行运行,多个分片也需要跨节点拷贝,效率比较低。
具体到wordcount,由于我们输入的文件是纯文本的,所以可以分片;而客户端会根据HDFS各个分片所在的节点地址分别添加到待启动的各个MR任务上,任务总数会写到MRJobConfig.NUM_MAPS
,提交了以后使用。
至此,我们明确了Map任务个数的确定者以及其确定阶段。 下期预告:第二个问题也是大家经常问的一个,HDFS的block是否粗暴但忠实的将文件按照64MB分片呢?如果是的话,怎么保证Map获取到的Splits是正确的?具体到wordcount,MR是怎么处理一个单词跨block的情况呢?
Subscribe via RSS