在 YARN 中,Application 是指应用程序,它可能启动多个运行实例,每个运行实例由 —个 ApplicationMaster 与一组该 ApplicationMaster 启动的任务组成,它拥有名称、队列、优先级等属性,是一个比较宽泛的概念,可以是一个 MepReduce 作业、一个 DAG 应用程序等。YARN 中 Application 管理涉及应用程序的权限管理、启动与关闭、生命周期管理等,本节只介绍最基本的管理内容,比如权限管理、启动与关闭等,而生命周期管理则放到下一节中介绍。

一、ApplicationACLsManager

ApplicationACLsManager 负责管理应用程序访问权限

  • 查看权限
    • 程序基本信息:运行时间、优先级等
  • 修改权限
    • 修改程序优先级、杀死应用程序

二、RMAppManager

RMAppManager 负责应用程序启动和关闭。接下来结合源码主要分析启动和结束两个操作。

1、启动

在「4-1 ResourceManager 功能概述」中,提到了 ClientRMService 处理来自客户端各种 RPC 请求,比如提交、终止获取应用运行状态等。
ClientRMService 当收到客户端提交的应用后,将调用函数 RMAppManager#submitApplication 创建一个 RMApp 对象,维护应用程序的整个生命周期。

protected void submitApplication() {    // 创建 app,并添加到 RMActiveServiceContext.applicationsRMAppImpl application =    createAndPopulateNewRMApp(submissionContext, submitTime, user, false);    // 发送 app start event,继续由其他事件处理器处理    this.rmContext.getDispatcher().getEventHandler()        .handle(new RMAppEvent(applicationId, RMAppEventType.START));}

2、结束

当 RMAPP 运行结束后,将向 RMAPPManager 发送一个 RMAPPManagerEventType.APP_COMPLETED 事件。看源码将执行 3 个操作:

  public void handle(RMAppManagerEvent event) {    ApplicationId applicationId = event.getApplicationId();    LOG.debug("RMAppManager processing event for "         + applicationId + " of type " + event.getType());    switch(event.getType()) {      case APP_COMPLETED:       {        finishApplication(applicationId);        logApplicationSummary(applicationId);        checkAppNumCompletedLimit();       } 
  • finishApplication()
    • 将 Application 放入到内存的已完成列表 completedApps 中,用户可查询历史应用执行信息(如 yarn web)。
  • logApplicationSummary()
    • 打印日志信息。
  • checkAppNumCompletedLimit()
    • 上面提到的 completedApps 列表容量有限,默认 10000,可修改。超过该值时,将从在这里被移除,后续可从 History Server 中进行查看。
    • 将应用程序从 RMStateStore 中移除。RMStateStore 记录了运行中的应用程序的运行日志,当集群故障重启后,RM 可通过这些日志恢复应用程序运行状态,从而避免全部重新运行,一旦应用程序运行结束后,这些日志便失去了意义, 故可以对其进行删除。

三、ContainerAllocationExpirer

当 AM 获得 Container 后,必须在一定时间内(默认为 10min,可修改),在对应的 NM 上启动该 Container,否则 RM 将强制回收该 Container。因为 YARN 不允许 AM 长时间不对其使用,会降低整个集群的利用率。

protected void expire(AllocationExpirationInfo allocationExpirationInfo) {  dispatcher.handle(new ContainerExpiredSchedulerEvent(      allocationExpirationInfo.getContainerId(),          allocationExpirationInfo.isIncrease()));}

该类也继承自抽象类 AbstractLivelinessMonitor,前面已经讲过,这里不再赘述。