博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
HadoopSourceAnalyse --- Mapreduce ApplicationMaster Job FSM
阅读量:5306 次
发布时间:2019-06-14

本文共 14179 字,大约阅读时间需要 47 分钟。

Overview

图 1-1

JOB_INIT HANDLE

当Job收到JOB_INIT事件之后,Job开始初始自己:
图 2-1
首先,Job要setup相应配置信息,包括,Job submit id, 提交时的目录, 运行job所需的配置文件,安全信息;
String oldJobIDString = job.oldJobId.toString();      String user =         UserGroupInformation.getCurrentUser().getShortUserName();      Path path = MRApps.getStagingAreaDir(job.conf, user);      if(LOG.isDebugEnabled()) {        LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);      }      job.remoteJobSubmitDir =          FileSystem.get(job.conf).makeQualified(              new Path(path, oldJobIDString));      job.remoteJobConfFile =          new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);      // Prepare the TaskAttemptListener server for authentication of Containers      // TaskAttemptListener gets the information via jobTokenSecretManager.      JobTokenIdentifier identifier =          new JobTokenIdentifier(new Text(oldJobIDString));      job.jobToken =          new Token
(identifier, job.jobTokenSecretManager); job.jobToken.setService(identifier.getJobId()); // Add it to the jobTokenSecretManager so that TaskAttemptListener server // can authenticate containers(tasks) job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken); LOG.info("Adding job token for " + oldJobIDString + " to jobTokenSecretManager"); // If the job client did not setup the shuffle secret then reuse // the job token secret for the shuffle. if (TokenCache.getShuffleSecretKey(job.fsTokens) == null) { LOG.warn("Shuffle secret key missing from job credentials." + " Using job token secret as shuffle secret."); TokenCache.setShuffleSecretKey(job.jobToken.getPassword(), job.fsTokens); #default_user simone }
setup 成功之后,设置,Job运行的文件系统,并通知History service记录,job已经完成初始化:
job.fs = job.getFileSystem(job.conf);        //log to job history        JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,              job.conf.get(MRJobConfig.JOB_NAME, "test"),             job.conf.get(MRJobConfig.USER_NAME, "mapred"),            job.appSubmitTime,            job.remoteJobConfFile.toString(),            job.jobACLs, job.queueName,            job.conf.get(MRJobConfig.WORKFLOW_ID, ""),            job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),            job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),            getWorkflowAdjacencies(job.conf),            job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));        job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));        //TODO JH Verify jobACLs, UserName via UGI?
完成之后,开始初化,task相关信息:首先是Split 信息:
allTaskSplitMetaInfo = SplitMetaInfoReader.readSplitMetaInfo(            job.oldJobId, job.fs,             job.conf,             job.remoteJobSubmitDir);
long maxMetaInfoSize = conf.getLong(MRJobConfig.SPLIT_METAINFO_MAXSIZE,        MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);    Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir);    String jobSplitFile = JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString();    FileStatus fStatus = fs.getFileStatus(metaSplitFile);    if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {      throw new IOException("Split metadata size exceeded " +          maxMetaInfoSize +". Aborting job " + jobId);    }    FSDataInputStream in = fs.open(metaSplitFile);    byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];    in.readFully(header);    if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {      throw new IOException("Invalid header on split file");    }    int vers = WritableUtils.readVInt(in);    if (vers != JobSplit.META_SPLIT_VERSION) {      in.close();      throw new IOException("Unsupported split version " + vers);    }    int numSplits = WritableUtils.readVInt(in); //TODO: check for insane values    JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo =       new JobSplit.TaskSplitMetaInfo[numSplits];    for (int i = 0; i < numSplits; i++) {      JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();      splitMetaInfo.readFields(in);      JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(          jobSplitFile,           splitMetaInfo.getStartOffset());      allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,           splitMetaInfo.getLocations(),           splitMetaInfo.getInputDataLength());    }    in.close();
然后是Map 与 Reduce tas量的配置,Job 运行时context 信息,Job 类型 信息:
job.numMapTasks = taskSplitMetaInfo.length;        job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);        if (job.numMapTasks == 0 && job.numReduceTasks == 0) {          job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);        } else if (job.numMapTasks == 0) {          job.reduceWeight = 0.9f;        } else if (job.numReduceTasks == 0) {          job.mapWeight = 0.9f;        } else {          job.mapWeight = job.reduceWeight = 0.45f;        }        checkTaskLimits();        if (job.newApiCommitter) {          job.jobContext = new JobContextImpl(job.conf,              job.oldJobId);        } else {          job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(              job.conf, job.oldJobId);        }                long inputLength = 0;        for (int i = 0; i < job.numMapTasks; ++i) {          inputLength += taskSplitMetaInfo[i].getInputDataLength();        }        job.makeUberDecision(inputLength);                job.taskAttemptCompletionEvents =            new ArrayList
( job.numMapTasks + job.numReduceTasks + 10); job.mapAttemptCompletionEvents = new ArrayList
(job.numMapTasks + 10); job.taskCompletionIdxToMapCompletionIdx = new ArrayList
( job.numMapTasks + job.numReduceTasks + 10); job.allowedMapFailuresPercent = job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0); job.allowedReduceFailuresPercent = job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
最后,创建Map与 ReduceTask:
Map Task:
for (int i=0; i < job.numMapTasks; ++i) {        TaskImpl task =            new MapTaskImpl(job.jobId, i,                job.eventHandler,                 job.remoteJobConfFile,                 job.conf, splits[i],                 job.taskAttemptListener,                 job.jobToken, job.fsTokens,                job.clock,                job.applicationAttemptId.getAttemptId(),                job.metrics, job.appContext);        job.addTask(task);      }
Reduce Task:
for (int i = 0; i < job.numReduceTasks; i++) {        TaskImpl task =            new ReduceTaskImpl(job.jobId, i,                job.eventHandler,                 job.remoteJobConfFile,                 job.conf, job.numMapTasks,                 job.taskAttemptListener, job.jobToken,                job.fsTokens, job.clock,                job.applicationAttemptId.getAttemptId(),                job.metrics, job.appContext);        job.addTask(task);      }
至此,job初始化完成,但所有的Task都还没有执行,Job 进入INITED 状态,并等待JOB_START事件;

JOB_START handle

当Job收到START 事件时,Job会很确认当前Job是否是从以前的Job中恢复的,如果是,则重置 StartTime为之前Job的开始时间,否则,设置为当前时间:
if (jse.getRecoveredJobStartTime() != 0) {        job.startTime = jse.getRecoveredJobStartTime();      } else {        job.startTime = job.clock.getTime();      }
然后,Job通知History service记录,Job已经完成初始化,而且状态已经变化——已经设置了启动时间, 最后通知Commiter初始化:
job.eventHandler.handle(new CommitterJobSetupEvent(              job.jobId, job.jobContext));
Job进入SETUP状态,等待JOB_SETUP_COMPLETE事件。事件进入了,commiter状态机;

JOB_SETUP_COMPLETE Handle

收到该事件,表明所有的Job相关的信息,已经初始完成,包括运行时信息,结果输出信息。Job开始启动所有的Task 运行:
job.setupProgress = 1.0f;      job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);      job.scheduleTasks(job.reduceTasks, true);      // If we have no tasks, just transition to job completed      if (job.numReduceTasks == 0 && job.numMapTasks == 0) {        job.eventHandler.handle(new JobEvent(job.jobId,            JobEventType.JOB_COMPLETED));      }
事件进入Task FSM, Job进入running 状态,等待JOB_TASK_ATTEMPT_COMPLETED,JOB_TASK_COMPLETED, JOB_COMPLETED,JOB_TASK_ATTEMPT_FETCH_FAILURE,事件;

JOB_TASK_ATTEMPT_COMPLETED Handle

当收到该事件后,首先设置EventId为 该event在完成的事件列表中的index,并把该event 放入到完成事件列表中,如果该事件是Map task事件,则把该事件,再放入,map完成事件列表中,并保存该事件在map 完成事件,列表中的位置:
// Add the TaskAttemptCompletionEvent      //eventId is equal to index in the arraylist      tce.setEventId(job.taskAttemptCompletionEvents.size());      job.taskAttemptCompletionEvents.add(tce);      int mapEventIdx = -1;      if (TaskType.MAP.equals(tce.getAttemptId().getTaskId().getTaskType())) {        // we track map completions separately from task completions because        // - getMapAttemptCompletionEvents uses index ranges specific to maps        // - type converting the same events over and over is expensive        mapEventIdx = job.mapAttemptCompletionEvents.size();        job.mapAttemptCompletionEvents.add(TypeConverter.fromYarn(tce));      }      job.taskCompletionIdxToMapCompletionIdx.add(mapEventIdx);
然后,判断在成功的完成事件列表是否存在该task 对应的事件,如果有,则置前一个事件为obsolete的:
TaskAttemptId attemptId = tce.getAttemptId();      TaskId taskId = attemptId.getTaskId();      //make the previous completion event as obsolete if it exists      Integer successEventNo =          job.successAttemptCompletionEventNoMap.remove(taskId);      if (successEventNo != null) {        TaskAttemptCompletionEvent successEvent =           job.taskAttemptCompletionEvents.get(successEventNo);        successEvent.setStatus(TaskAttemptCompletionEventStatus.OBSOLETE);        int mapCompletionIdx =            job.taskCompletionIdxToMapCompletionIdx.get(successEventNo);        if (mapCompletionIdx >= 0) {          // update the corresponding TaskCompletionEvent for the map          TaskCompletionEvent mapEvent =              job.mapAttemptCompletionEvents.get(mapCompletionIdx);          job.mapAttemptCompletionEvents.set(mapCompletionIdx,              new TaskCompletionEvent(mapEvent.getEventId(),                  mapEvent.getTaskAttemptId(), mapEvent.idWithinJob(),                  mapEvent.isMapTask(), TaskCompletionEvent.Status.OBSOLETE,                  mapEvent.getTaskTrackerHttp()));        }      }
如果该事件是成功的事件,则保存该事件,及运行该事件的,nodeId:
// if this attempt is not successful then why is the previous successful       // attempt being removed above - MAPREDUCE-4330      if (TaskAttemptCompletionEventStatus.SUCCEEDED.equals(tce.getStatus())) {        job.successAttemptCompletionEventNoMap.put(taskId, tce.getEventId());                // here we could have simply called Task.getSuccessfulAttempt() but        // the event that triggers this code is sent before        // Task.successfulAttempt is set and so there is no guarantee that it        // will be available now        Task task = job.tasks.get(taskId);        TaskAttempt attempt = task.getAttempt(attemptId);        NodeId nodeId = attempt.getNodeId();        assert (nodeId != null); // node must exist for a successful event        List
taskAttemptIdList = job.nodesToSucceededTaskAttempts .get(nodeId); if (taskAttemptIdList == null) { taskAttemptIdList = new ArrayList
(); job.nodesToSucceededTaskAttempts.put(nodeId, taskAttemptIdList); } taskAttemptIdList.add(attempt.getID()); }
处理完该事件,之后,Job仍然处于,RUNNING状态;

JOB_TASK_COMPLETED Handle

收到该事件之后,统计该完成事件,然后,取出相应的task对像,如果该事年是成功事件,则统计成功事件:
if (task.getType() == TaskType.MAP) {        job.succeededMapTaskCount++;      } else {        job.succeededReduceTaskCount++;      }      job.metrics.completedTask(task);
如果该事件是失败事件,统计失败事件:
if (task.getType() == TaskType.MAP) {        job.failedMapTaskCount++;      } else if (task.getType() == TaskType.REDUCE) {        job.failedReduceTaskCount++;      }      job.addDiagnostic("Task failed " + task.getID());      job.metrics.failedTask(task);
如果是kill 事件,则统计kill 事件:
if (task.getType() == TaskType.MAP) {        job.killedMapTaskCount++;      } else if (task.getType() == TaskType.REDUCE) {        job.killedReduceTaskCount++;      }      job.metrics.killedTask(task);
然后,判断失败task是否超出系统配置的充许值,若已超出,则停止任务的执行,并通知Committer 放弃该任务:
//check for Job failure      if (job.failedMapTaskCount*100 >         job.allowedMapFailuresPercent*job.numMapTasks ||        job.failedReduceTaskCount*100 >         job.allowedReduceFailuresPercent*job.numReduceTasks) {        job.setFinishTime();        String diagnosticMsg = "Job failed as tasks failed. " +            "failedMaps:" + job.failedMapTaskCount +             " failedReduces:" + job.failedReduceTaskCount;        LOG.info(diagnosticMsg);        job.addDiagnostic(diagnosticMsg);        job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,            job.jobContext,            org.apache.hadoop.mapreduce.JobStatus.State.FAILED));        return JobStateInternal.FAIL_ABORT;
Job进入FAIL_ABORT状态;
如果没有超出,则检查,是否还有更多task,没有完成,如果有,则Job仍停留在Running状态,否则,进入commiting 状态,准备提交运行结果:
if (completedTaskCount == tasks.size()        && currentState == JobStateInternal.RUNNING) {      eventHandler.handle(new CommitterJobCommitEvent(jobId, getJobContext()));      return JobStateInternal.COMMITTING;    }    // return the current state as job not ready to commit yet    return getInternalState();

JOB_TASK_COMPLETED Handle

收到该事件,检查是否还有更多task,没有完成,如果有,则Job仍停留在Running状态,否则,进入commiting 状态,准备提交运行结果;

JOB_TASK_ATTEMPT_FETCH_FAILURE Handle

收到该事件后,计算出所有的在SHUFFLE的task数量,然后,从event中取出失败的map Task的数量,计算失败的map task与正在SHUFFLE的task的比例,如果超出系统允许的比例并且失败的map task数量,超过系统允许值,则向task 发送,失败事件:

boolean isMapFaulty =            (failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);        if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS && isMapFaulty) {          LOG.info("Too many fetch-failures for output of task attempt: " +               mapId + " ... raising fetch failure to map");          job.eventHandler.handle(new TaskAttemptEvent(mapId,               TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));          job.fetchFailuresMapping.remove(mapId);        }

  所有的task 执行完成之后,job进入committing状态,等待committer的提交完成通知,之后,Job 退出;

转载于:https://www.cnblogs.com/tnangle/archive/2013/05/17/3376695.html

你可能感兴趣的文章
struts-tags通用标签基本用法
查看>>
软件架构_黑板模式
查看>>
log4delphi使用(转)
查看>>
Winform 数据验证
查看>>
MongoDB基础使用
查看>>
[转]跨机房问题
查看>>
.NET中获取电脑名,IP地址,当前用户的方法属性大整合
查看>>
http请求中的data&json
查看>>
java 类的继承和接口的继承
查看>>
常见的HTML5语义化标签
查看>>
python装饰器
查看>>
SpringBoot 项目 部署 jar方式
查看>>
SYSUCPC2017 1007 Tutu’s Array II
查看>>
NGUI使用Bitmap制作特殊数字
查看>>
第二阶段冲刺01
查看>>
三层架构
查看>>
204
查看>>
Requests基础的一些应用和Xpath
查看>>
~ 按位取反
查看>>
go语言通过反射创建结构体、赋值、并调用对应方法
查看>>