Overview
图 1-1
JOB_INIT HANDLE
当Job收到JOB_INIT事件之后,Job开始初始自己:
图 2-1
首先,Job要setup相应配置信息,包括,Job submit id, 提交时的目录, 运行job所需的配置文件,安全信息;
String oldJobIDString = job.oldJobId.toString(); String user = UserGroupInformation.getCurrentUser().getShortUserName(); Path path = MRApps.getStagingAreaDir(job.conf, user); if(LOG.isDebugEnabled()) { LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString); } job.remoteJobSubmitDir = FileSystem.get(job.conf).makeQualified( new Path(path, oldJobIDString)); job.remoteJobConfFile = new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE); // Prepare the TaskAttemptListener server for authentication of Containers // TaskAttemptListener gets the information via jobTokenSecretManager. JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(oldJobIDString)); job.jobToken = new Tokensetup 成功之后,设置,Job运行的文件系统,并通知History service记录,job已经完成初始化:(identifier, job.jobTokenSecretManager); job.jobToken.setService(identifier.getJobId()); // Add it to the jobTokenSecretManager so that TaskAttemptListener server // can authenticate containers(tasks) job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken); LOG.info("Adding job token for " + oldJobIDString + " to jobTokenSecretManager"); // If the job client did not setup the shuffle secret then reuse // the job token secret for the shuffle. if (TokenCache.getShuffleSecretKey(job.fsTokens) == null) { LOG.warn("Shuffle secret key missing from job credentials." + " Using job token secret as shuffle secret."); TokenCache.setShuffleSecretKey(job.jobToken.getPassword(), job.fsTokens); #default_user simone }
job.fs = job.getFileSystem(job.conf); //log to job history JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId, job.conf.get(MRJobConfig.JOB_NAME, "test"), job.conf.get(MRJobConfig.USER_NAME, "mapred"), job.appSubmitTime, job.remoteJobConfFile.toString(), job.jobACLs, job.queueName, job.conf.get(MRJobConfig.WORKFLOW_ID, ""), job.conf.get(MRJobConfig.WORKFLOW_NAME, ""), job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""), getWorkflowAdjacencies(job.conf), job.conf.get(MRJobConfig.WORKFLOW_TAGS, "")); job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse)); //TODO JH Verify jobACLs, UserName via UGI?完成之后,开始初化,task相关信息:首先是Split 信息:
allTaskSplitMetaInfo = SplitMetaInfoReader.readSplitMetaInfo( job.oldJobId, job.fs, job.conf, job.remoteJobSubmitDir);
long maxMetaInfoSize = conf.getLong(MRJobConfig.SPLIT_METAINFO_MAXSIZE, MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE); Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir); String jobSplitFile = JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString(); FileStatus fStatus = fs.getFileStatus(metaSplitFile); if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) { throw new IOException("Split metadata size exceeded " + maxMetaInfoSize +". Aborting job " + jobId); } FSDataInputStream in = fs.open(metaSplitFile); byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length]; in.readFully(header); if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) { throw new IOException("Invalid header on split file"); } int vers = WritableUtils.readVInt(in); if (vers != JobSplit.META_SPLIT_VERSION) { in.close(); throw new IOException("Unsupported split version " + vers); } int numSplits = WritableUtils.readVInt(in); //TODO: check for insane values JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = new JobSplit.TaskSplitMetaInfo[numSplits]; for (int i = 0; i < numSplits; i++) { JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo(); splitMetaInfo.readFields(in); JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex( jobSplitFile, splitMetaInfo.getStartOffset()); allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex, splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength()); } in.close();然后是Map 与 Reduce tas量的配置,Job 运行时context 信息,Job 类型 信息:
job.numMapTasks = taskSplitMetaInfo.length; job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0); if (job.numMapTasks == 0 && job.numReduceTasks == 0) { job.addDiagnostic("No of maps and reduces are 0 " + job.jobId); } else if (job.numMapTasks == 0) { job.reduceWeight = 0.9f; } else if (job.numReduceTasks == 0) { job.mapWeight = 0.9f; } else { job.mapWeight = job.reduceWeight = 0.45f; } checkTaskLimits(); if (job.newApiCommitter) { job.jobContext = new JobContextImpl(job.conf, job.oldJobId); } else { job.jobContext = new org.apache.hadoop.mapred.JobContextImpl( job.conf, job.oldJobId); } long inputLength = 0; for (int i = 0; i < job.numMapTasks; ++i) { inputLength += taskSplitMetaInfo[i].getInputDataLength(); } job.makeUberDecision(inputLength); job.taskAttemptCompletionEvents = new ArrayList最后,创建Map与 ReduceTask:( job.numMapTasks + job.numReduceTasks + 10); job.mapAttemptCompletionEvents = new ArrayList (job.numMapTasks + 10); job.taskCompletionIdxToMapCompletionIdx = new ArrayList ( job.numMapTasks + job.numReduceTasks + 10); job.allowedMapFailuresPercent = job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0); job.allowedReduceFailuresPercent = job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
Map Task:
for (int i=0; i < job.numMapTasks; ++i) { TaskImpl task = new MapTaskImpl(job.jobId, i, job.eventHandler, job.remoteJobConfFile, job.conf, splits[i], job.taskAttemptListener, job.jobToken, job.fsTokens, job.clock, job.applicationAttemptId.getAttemptId(), job.metrics, job.appContext); job.addTask(task); }Reduce Task:
for (int i = 0; i < job.numReduceTasks; i++) { TaskImpl task = new ReduceTaskImpl(job.jobId, i, job.eventHandler, job.remoteJobConfFile, job.conf, job.numMapTasks, job.taskAttemptListener, job.jobToken, job.fsTokens, job.clock, job.applicationAttemptId.getAttemptId(), job.metrics, job.appContext); job.addTask(task); }至此,job初始化完成,但所有的Task都还没有执行,Job 进入INITED 状态,并等待JOB_START事件;
JOB_START handle
当Job收到START 事件时,Job会很确认当前Job是否是从以前的Job中恢复的,如果是,则重置 StartTime为之前Job的开始时间,否则,设置为当前时间:
if (jse.getRecoveredJobStartTime() != 0) { job.startTime = jse.getRecoveredJobStartTime(); } else { job.startTime = job.clock.getTime(); }
然后,Job通知History service记录,Job已经完成初始化,而且状态已经变化——已经设置了启动时间, 最后通知Commiter初始化:
job.eventHandler.handle(new CommitterJobSetupEvent( job.jobId, job.jobContext));
Job进入SETUP状态,等待JOB_SETUP_COMPLETE事件。事件进入了,commiter状态机;
JOB_SETUP_COMPLETE Handle
收到该事件,表明所有的Job相关的信息,已经初始完成,包括运行时信息,结果输出信息。Job开始启动所有的Task 运行:
job.setupProgress = 1.0f; job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0); job.scheduleTasks(job.reduceTasks, true); // If we have no tasks, just transition to job completed if (job.numReduceTasks == 0 && job.numMapTasks == 0) { job.eventHandler.handle(new JobEvent(job.jobId, JobEventType.JOB_COMPLETED)); }事件进入Task FSM, Job进入running 状态,等待JOB_TASK_ATTEMPT_COMPLETED,JOB_TASK_COMPLETED, JOB_COMPLETED,JOB_TASK_ATTEMPT_FETCH_FAILURE,事件;
JOB_TASK_ATTEMPT_COMPLETED Handle
// Add the TaskAttemptCompletionEvent //eventId is equal to index in the arraylist tce.setEventId(job.taskAttemptCompletionEvents.size()); job.taskAttemptCompletionEvents.add(tce); int mapEventIdx = -1; if (TaskType.MAP.equals(tce.getAttemptId().getTaskId().getTaskType())) { // we track map completions separately from task completions because // - getMapAttemptCompletionEvents uses index ranges specific to maps // - type converting the same events over and over is expensive mapEventIdx = job.mapAttemptCompletionEvents.size(); job.mapAttemptCompletionEvents.add(TypeConverter.fromYarn(tce)); } job.taskCompletionIdxToMapCompletionIdx.add(mapEventIdx);然后,判断在成功的完成事件列表是否存在该task 对应的事件,如果有,则置前一个事件为obsolete的:
TaskAttemptId attemptId = tce.getAttemptId(); TaskId taskId = attemptId.getTaskId(); //make the previous completion event as obsolete if it exists Integer successEventNo = job.successAttemptCompletionEventNoMap.remove(taskId); if (successEventNo != null) { TaskAttemptCompletionEvent successEvent = job.taskAttemptCompletionEvents.get(successEventNo); successEvent.setStatus(TaskAttemptCompletionEventStatus.OBSOLETE); int mapCompletionIdx = job.taskCompletionIdxToMapCompletionIdx.get(successEventNo); if (mapCompletionIdx >= 0) { // update the corresponding TaskCompletionEvent for the map TaskCompletionEvent mapEvent = job.mapAttemptCompletionEvents.get(mapCompletionIdx); job.mapAttemptCompletionEvents.set(mapCompletionIdx, new TaskCompletionEvent(mapEvent.getEventId(), mapEvent.getTaskAttemptId(), mapEvent.idWithinJob(), mapEvent.isMapTask(), TaskCompletionEvent.Status.OBSOLETE, mapEvent.getTaskTrackerHttp())); } }如果该事件是成功的事件,则保存该事件,及运行该事件的,nodeId:
// if this attempt is not successful then why is the previous successful // attempt being removed above - MAPREDUCE-4330 if (TaskAttemptCompletionEventStatus.SUCCEEDED.equals(tce.getStatus())) { job.successAttemptCompletionEventNoMap.put(taskId, tce.getEventId()); // here we could have simply called Task.getSuccessfulAttempt() but // the event that triggers this code is sent before // Task.successfulAttempt is set and so there is no guarantee that it // will be available now Task task = job.tasks.get(taskId); TaskAttempt attempt = task.getAttempt(attemptId); NodeId nodeId = attempt.getNodeId(); assert (nodeId != null); // node must exist for a successful event List处理完该事件,之后,Job仍然处于,RUNNING状态;taskAttemptIdList = job.nodesToSucceededTaskAttempts .get(nodeId); if (taskAttemptIdList == null) { taskAttemptIdList = new ArrayList (); job.nodesToSucceededTaskAttempts.put(nodeId, taskAttemptIdList); } taskAttemptIdList.add(attempt.getID()); }
JOB_TASK_COMPLETED Handle
收到该事件之后,统计该完成事件,然后,取出相应的task对像,如果该事年是成功事件,则统计成功事件:if (task.getType() == TaskType.MAP) { job.succeededMapTaskCount++; } else { job.succeededReduceTaskCount++; } job.metrics.completedTask(task);如果该事件是失败事件,统计失败事件:
if (task.getType() == TaskType.MAP) { job.failedMapTaskCount++; } else if (task.getType() == TaskType.REDUCE) { job.failedReduceTaskCount++; } job.addDiagnostic("Task failed " + task.getID()); job.metrics.failedTask(task);如果是kill 事件,则统计kill 事件:
if (task.getType() == TaskType.MAP) { job.killedMapTaskCount++; } else if (task.getType() == TaskType.REDUCE) { job.killedReduceTaskCount++; } job.metrics.killedTask(task);然后,判断失败task是否超出系统配置的充许值,若已超出,则停止任务的执行,并通知Committer 放弃该任务:
//check for Job failure if (job.failedMapTaskCount*100 > job.allowedMapFailuresPercent*job.numMapTasks || job.failedReduceTaskCount*100 > job.allowedReduceFailuresPercent*job.numReduceTasks) { job.setFinishTime(); String diagnosticMsg = "Job failed as tasks failed. " + "failedMaps:" + job.failedMapTaskCount + " failedReduces:" + job.failedReduceTaskCount; LOG.info(diagnosticMsg); job.addDiagnostic(diagnosticMsg); job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); return JobStateInternal.FAIL_ABORT;Job进入FAIL_ABORT状态; 如果没有超出,则检查,是否还有更多task,没有完成,如果有,则Job仍停留在Running状态,否则,进入commiting 状态,准备提交运行结果:
if (completedTaskCount == tasks.size() && currentState == JobStateInternal.RUNNING) { eventHandler.handle(new CommitterJobCommitEvent(jobId, getJobContext())); return JobStateInternal.COMMITTING; } // return the current state as job not ready to commit yet return getInternalState();
JOB_TASK_COMPLETED Handle
收到该事件,检查是否还有更多task,没有完成,如果有,则Job仍停留在Running状态,否则,进入commiting 状态,准备提交运行结果;
JOB_TASK_ATTEMPT_FETCH_FAILURE Handle
收到该事件后,计算出所有的在SHUFFLE的task数量,然后,从event中取出失败的map Task的数量,计算失败的map task与正在SHUFFLE的task的比例,如果超出系统允许的比例并且失败的map task数量,超过系统允许值,则向task 发送,失败事件:
boolean isMapFaulty = (failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION); if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS && isMapFaulty) { LOG.info("Too many fetch-failures for output of task attempt: " + mapId + " ... raising fetch failure to map"); job.eventHandler.handle(new TaskAttemptEvent(mapId, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); job.fetchFailuresMapping.remove(mapId); }
所有的task 执行完成之后,job进入committing状态,等待committer的提交完成通知,之后,Job 退出;