mapreduce程序提交job源码分析

缘起

继续入门, 运行mapreduce的helloworld程序——wordcount. 运行mr程序的方式有两种

  1. 本地模式,你导入的hadoop的jar包包括了本地模拟器
  2. 集群模式, 打成(普通)jar包(未必要是可运行jar包),扔到hadoop集群上使用hadoop jar 命令运行

这不是本文讨论的重点, 本文讨论的重点在于最后提交(job)

1
job.waitForCompletion(true);

本文就在于分析其提交过程

分析

打好断点,m首先进入到 waitForCompletion的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}

所以waitForCompletion 其实是包含submit方法的. 下面重点看看submit方法干了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void submit() 
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}

下面围绕上面的代码作分析.

其中第三行代码的作用是确认状态.

1
2
3
4
5
6
7
8
9
10
11
12
private void ensureState(JobState state) throws IllegalStateException {
if (state != this.state) {
throw new IllegalStateException("Job in state "+ this.state +
" instead of " + state);
}

if (state == JobState.RUNNING && cluster == null) {
throw new IllegalStateException
("Job in state " + this.state
+ ", but it isn't attached to any job tracker!");
}
}

注意,本地模式下 state(和this.state) 是DEFINE, cluster是null,所以该方法不会抛异常.

而setUseNewAPI()方法的作用是老的API解读为新的API, 其中老的API一般是以 mapred为包名的. 而新的API的包名都替换成了mapreduce. 这个方法将就的API的设置的属性转换为新的API的属性. 然后我们来到了 connect()方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
if (cluster == null) {
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
return new Cluster(getConfiguration());
}
});
}
}

本地模式下,cluster是null, 所以会进这个代码. 其中ugi是UserGroupInformation类, 则cluster变成了一个不是null的对象(注意, 这里doAs传入的是一个回调函数, 即会返回一个new Cluster()). 而Cluster的带一个Configuration的构造器中

1
2
3
4
5
6
7
8
9
10
public Cluster(Configuration conf) throws IOException {
this(null, conf);
}

public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
initialize(jobTrackAddr, conf);
}

conf是Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml

其中initialize方法的源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {

synchronized (frameworkLoader) {
for (ClientProtocolProvider provider : frameworkLoader) {
LOG.debug("Trying ClientProtocolProvider : "
+ provider.getClass().getName());
ClientProtocol clientProtocol = null;
try {
if (jobTrackAddr == null) {
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}

if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
LOG.debug("Picked " + provider.getClass().getName()
+ " as the ClientProtocolProvider");
break;
}
else {
LOG.debug("Cannot pick " + provider.getClass().getName()
+ " as the ClientProtocolProvider - returned null protocol");
}
}
catch (Exception e) {
LOG.info("Failed to use " + provider.getClass().getName()
+ " due to error: ", e);
}
}
}

if (null == clientProtocolProvider || null == client) {
throw new IOException(
"Cannot initialize Cluster. Please check your configuration for "
+ MRConfig.FRAMEWORK_NAME
+ " and the correspond server addresses.");
}
}

本地模式下会走第11行, 得到的是一个 org.apache.hadoop.mapred.LocalJobRunner .这里就是得到了Cluster的两个成员变量

  1. client 是一个org.apache.hadoop.mapred.LocalJobRunner
  2. clientProtocolProvider 是一个org.apache.hadoop.mapred.LocalClientProtocolProvider

如果是提交到Yarn的话, 则就是YarnRunner.

再将这个Cluster返回给submit方法中。即submit方法中的connect方法中初始化了Job类中的cluster成员变量为一个含有上述client、clientProtocolProvider 的类.

再看submit方法中的

1
2
final JobSubmitter submitter = 
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());

其中 cluster.getFileSystem()得到org.apache.hadoop.fs.LocalFileSystem@7d7758be, 而 cluster.getClient()得到的就是上面说的一个 org.apache.hadoop.mapred.LocalJobRunner@2bdd8394. 可见都是 本地的东西.

执行完ugi.doAs之后, 返回的status就是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
job-id : job_local1871012903_0001
uber-mode : false
map-progress : 1.0
reduce-progress : 1.0
cleanup-progress : 1.0
setup-progress : 1.0
runstate : SUCCEEDED
start-time : 0
user-name : yfs
priority : NORMAL
scheduling-info : NAnum-used-slots0
num-reserved-slots0
used-mem0
reserved-mem0
needed-mem0

我们可以推测, 在上面的ugi.doAs方法就一定跑了任务, 因为 map-progress 和 reduce-progress 都是1.0

后面再继续分析ugi.doAs方法. 最后 将状态state改成RUNNING.

1
state = JobState.RUNNING;

submit方法的最后记录了一下job的trackingurl

1
LOG.info("The url to track the job: " + getTrackingURL());

这里打印了

1
2019-02-19 17:52:36,127 INFO [org.apache.hadoop.mapreduce.Job] - The url to track the job: http://localhost:8080/

然后我们分析一下剩下的 waitForCompletion方法. 入参verbose是true, 所以进入到monitorAndPrintJob()方法中去. 这个方法的逻辑是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* Monitor a job and print status in real-time as progress is made and tasks
* fail.
* @return true if the job succeeded
* @throws IOException if communication to the JobTracker fails
*/
public boolean monitorAndPrintJob()
throws IOException, InterruptedException {
String lastReport = null;
Job.TaskStatusFilter filter;
Configuration clientConf = getConfiguration();
filter = Job.getTaskOutputFilter(clientConf);
JobID jobId = getJobID();
LOG.info("Running job: " + jobId);
int eventCounter = 0;
boolean profiling = getProfileEnabled();
IntegerRanges mapRanges = getProfileTaskRange(true);
IntegerRanges reduceRanges = getProfileTaskRange(false);
int progMonitorPollIntervalMillis =
Job.getProgressPollInterval(clientConf);
/* make sure to report full progress after the job is done */
boolean reportedAfterCompletion = false;
boolean reportedUberMode = false;
while (!isComplete() || !reportedAfterCompletion) {
if (isComplete()) {
reportedAfterCompletion = true;
} else {
Thread.sleep(progMonitorPollIntervalMillis);
}
if (status.getState() == JobStatus.State.PREP) {
continue;
}
if (!reportedUberMode) {
reportedUberMode = true;
LOG.info("Job " + jobId + " running in uber mode : " + isUber());
}
String report =
(" map " + StringUtils.formatPercent(mapProgress(), 0)+
" reduce " +
StringUtils.formatPercent(reduceProgress(), 0));
if (!report.equals(lastReport)) {
LOG.info(report);
lastReport = report;
}

TaskCompletionEvent[] events =
getTaskCompletionEvents(eventCounter, 10);
eventCounter += events.length;
printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
}
boolean success = isSuccessful();
if (success) {
LOG.info("Job " + jobId + " completed successfully");
} else {
LOG.info("Job " + jobId + " failed with state " + status.getState() +
" due to: " + status.getFailureInfo());
}
Counters counters = getCounters();
if (counters != null) {
LOG.info(counters.toString());
}
return success;
}

从源码的注释(或者单纯从方法的名字)就可以看出,该方法的作用是通过轮询实时监控job的status属性(只要没完成就打印)并打印日志. 如果job成功的话, 返回true, 如果失败,打印false. 上面代码的第14行打印了job的id, 根据上面知道,自然是

1
2019-02-19 17:58:36,963 INFO [org.apache.hadoop.mapreduce.Job] - Running job: job_local1871012903_0001

最后waitForCompletion 的最后一行返回了isSuccessful(); 这个方法返回了 status是否等于SUCCEEDED. 即

1
2
3
4
5
6
7
8
9
10
11
/**
* Check if the job completed successfully.
*
* @return <code>true</code> if the job succeeded, else <code>false</code>.
* @throws IOException
*/
public boolean isSuccessful() throws IOException {
ensureState(JobState.RUNNING);
updateStatus();
return status.getState() == JobStatus.State.SUCCEEDED;
}

其中status只是Job中的一个JobStatus类型的成员变量. 下面回到submit方法, 我们来看提交job最重要的一段代码

1
2
3
4
5
6
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});

注意,这里和connect一样, 也是传入一个回调. 则在回调执行之前, status都是undefined的. 我们不难猜测这个submitJobInternal方法就在提交job切片信息和jar包, 以及job.xml信息. 其中第二个参数cluster参数前面说过了是org.apache.hadoop.mapreduce.Cluster@4c2bb6e0,它的client和filesystem都是本地的(相当于一个本地模拟器).

这个submitJobInternal方法 首先

1
2
//validate the jobs output specs 
checkSpecs(job);

而checkSpecs方法中有一行 output.checkOutputSpecs(job);, output 是FileOutputFormat的实例, 其checkSpecs方法中checkOutputSpecs方法中有这么一段

1
2
3
4
if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir +
" already exists");
}

即output是否已经存在的检查. 这就不难明白为什么我们输出路径不能事先存在的原因.

然后

1
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

我在本地模式调试的时候是

1
file:/tmp/hadoop-yfs/mapred/staging/yfs688390032/.staging

例如我的项目跑在F盘,则文件放在

1
F:\tmp\hadoop-yfs\mapred\staging\yfs688390032\.staging

目录中. 然后是对配置文件conf的进一步的设置

1
2
3
4
5
6
7
8
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}

设置了MR Job提交的地址. 然后是创建并设置任务的id并生成作业提交的路径

1
file:/tmp/hadoop-yfs/mapred/staging/yfs688390032/.staging/job_local688390032_0001

我们的项目跑在F盘上,所以路径是

1
F:\tmp\hadoop-yfs\mapred\staging\yfs688390032\.staging

然后进一步的设置conf的属性

1
2
3
4
5
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());

然后进一步的

1
copyAndConfigureFiles(job, submitJobDir);

这个方法的源码是

1
2
3
4
5
6
7
8
9
10
11
12
private void copyAndConfigureFiles(Job job, Path jobSubmitDir) 
throws IOException {
JobResourceUploader rUploader = new JobResourceUploader(jtFs);
rUploader.uploadFiles(job, jobSubmitDir);

// Get the working directory. If not set, sets it to filesystem working dir
// This code has been added so that working directory reset before running
// the job. This is necessary for backward compatibility as other systems
// might use the public API JobConf#setWorkingDirectory to reset the working
// directory.
job.getWorkingDirectory();
}

其中最后的job.getWorkingDirectory();是项目的根路径,这里是 file:/F:/xx/learning/mapreduce. 其中rUploader.uploadFiles方法的源码中有一段

1
2
3
4
5
// get all the command line arguments passed in by the user conf
String files = conf.get("tmpfiles");
String libjars = conf.get("tmpjars");
String archives = conf.get("tmparchives");
String jobJar = job.getJar();

其实rUploader.uploadFiles 主要就是上传jar包的. 但是yarn和本地模式有不一样的地方, yarn模式会拷贝jar包,但是本地模式不会.

然后submitJobInternal方法的下面一行的代码是

1
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

我这里是

1
file:/tmp/hadoop-yfs/mapred/staging/yfs1764451778/.staging/job_local1764451778_0001/job.xml

然后开始记录job分片信息

1
2
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));

我这里是

1
file:/tmp/hadoop-yfs/mapred/staging/yfs1764451778/.staging/job_local1764451778_0001

然后是job分片

1
int maps = writeSplits(job, submitJobDir);

返回的maps是分片数. 其中writeSplits方法中 的源码是

1
maps = writeNewSplits(job, jobSubmitDir);

进一步进入, 里面有源码

1
List<InputSplit> splits = input.getSplits(job);

再进一步进入是 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(JobContext). 其中有源码

1
2
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);

其中 getFormatMinSplitSize返回就是1, 所以其实就是getMinSplitSize(job)的结果,而 getMinSplitSize(job) 的源码是

1
2
3
public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}

其中SPLIT_MINSIZE=mapreduce.input.fileinputformat.split.minsize,1L是defaultValue. 所以如果配置文件中配置了mapreduce.input.fileinputformat.split.minsize的话, 就是它,如果没配置的话,就是 1. 而getMaxSplitSize方法的源码是

1
2
3
4
public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration().getLong(SPLIT_MAXSIZE,
Long.MAX_VALUE);
}

即默认是不限定分片总数(有多少算多少),除非你配置了

SPLIT_MAXSIZE = “mapreduce.input.fileinputformat.split.maxsize”

这样我们就得到了分片大小最大值和分片大小最小值. 然后下面是

1
2
3
4
5
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
...

表示开始产生切片文件了, 这里files就是我们的输入文件. 遍历处理做的事情是

其中一段源码我们可以得到 切片的splitsize

1
2
3
4
5
6
7
8
9
10
11
12
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
...

注意,做了一个是否可切割的判断, 有些文件不允许切片. 而且上面的SPLIT_SLOP 是1.1, 即如果剩余的字节数/splitsize(前面说了, 如果什么都不配置的话, 就是32M)<=1.1的话, 那就不再切割了, 例如129M在yarn集群上就只切一块出来而不是2块. 该方法返回的切片信息类似于,

1
file:/e:/input/hello.txt:0+260

即这个切片是260字节.

其中

1
2
3
4
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}

所以一般情况下不配置 mapreduce.input.fileinputformat.split.maxsize 和mapreduce.input.fileinputformat.split.minsize的话, 则computeSplitSize返回的结果就是blocksize,也就是32M(注意,和yarn不同,本地模式下blocksize是32M而不是128M). 最后writeNewSplits方法中的倒数第二行

1
2
JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
jobSubmitDir.getFileSystem(conf), array);

就会导致.staging目录创建job切片文件. 但是都不是文本文件.

回到 JobSubmitter.submitJobInternal然后开始记录分片数(并且将job分片信息写入到conf中去)

1
2
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);

然后是

1
2
// Write job file to submit dir
writeConf(conf, submitJobFile);

其中submitJobFile是 file:/tmp/hadoop-yfs/mapred/staging/yfs1917761191/.staging/job_local1917761191_0001/job.xml, 而conf是 Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml. 其中writeConf执行完毕,将产生一个job.xml文件(比较大, 最简单的wordcount都多达96K).

可以看一眼里面的内容

大部分是hadoop默认的配置信息.

然后 JobSubmitter.submitJobInternal中的代码是

1
2
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());

其中submitClient 是ClientProtocol接口,其实就是一开始获取Cluster中的client. 其实现类有两个

  1. org.apache.hadoop.mapred.LocalJobRunner
  2. org.apache.hadoop.mapred.YARNRunner

不用说, 也知道这里是org.apache.hadoop.mapred.LocalJobRunner(本地模拟器). jobId是 job_local1917761191_0001

submitJobDir.toString()是file:/tmp/hadoop-yfs/mapred/staging/yfs1917761191/.staging/job_local1917761191_0001

返回得到的status是

1
2
3
4
5
6
7
8
9
10
11
job-id : job_local1917761191_0001
uber-mode : false
map-progress : 1.0
reduce-progress : 1.0
cleanup-progress : 1.0
setup-progress : 1.0
runstate : SUCCEEDED
start-time : 0
user-name : yfs
priority : NORMAL
scheduling-info : NAnum-used-slots0num-reserved-slots0used-mem0reserved-mem0needed-mem0

然后在finally块中

1
jtFs.delete(submitJobDir, true);

即递归删除了提交给集群的文件. 这里submitJobDir 是file:/tmp/hadoop-yfs/mapred/staging/yfs1917761191/.staging/job_local1917761191_0001 最后只剩下 一个file:/tmp/hadoop-yfs/mapred/staging/yfs1917761191/.staging空目录.

这就是提交job的全过程. 其总体就是异步提交之后轮询打印进度的过程.