9

【深入浅出 Yarn 架构与实现】4-3 RM 管理 NodeManager - 大数据王小皮

 1 year ago
source link: https://www.cnblogs.com/shuofxz/p/17006550.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本篇继续对 RM 中管理 NodeManager 的部分进行深入的讲解。主要有三个部分:检查 NM 是否存活;管理 NM 的黑白名单;响应 NM RPC 请求。

一、简介#

在 RM 的主从结构中,最主要的就是 RM 和 NM 之间的主从结构。RM 作为「管理员」,管理下面多个 NM 节点。如何检测 NM 是否存活,划分黑白名单,以及如何相应 NM RPC 请求,将在下面进行详细介绍。

二、NM 管理主要组成#

一)NMLivelinessMonitor#

  • 检测 NM 活性的服务,是否有心跳
  • 当前正在运行的 NM 会保存在 RM 的一个数据结构中,NMLivelinessMonitor 就周期遍历,若一个 NM 在一定时间(默认10分钟)未汇报心跳,则任务其挂了
  • NM 被认为挂了后,其上的 Container 会自动置为运行失败,并通知给 AM,由 AM 决定后续处理方案

二)NodesListManager#

  • 管理 exclude(类似黑名单)和 include(类似白名单)列表
  • 启动时,它们分别从yarn.resourcemanager.nodes.include-path 以及 yarn.resourcemanager.nodes.exclude-path中读取
  • 黑名单列表中的nodes不能够和RM直接通信(直接抛出RPC异常)
  • 可以动态加载,使用命令 yarn rmadmin -refreshNodes

三)ResourceTrackerService#

ResourceTrackerService 是 RPC 协议 ResourceTracker 的一个实现,它作为一个 RPC Server 端接收 NodeManager 的 RPC 请求。
请求主要包含2种信息,1)注册NodeManager。2)处理心跳信息。

  • 注册 NodeManager 处理:ResourceTrackerService#registerNodeManager
// ResourceTrackerService#registerNodeManager
  public RegisterNodeManagerResponse registerNodeManager(
      RegisterNodeManagerRequest request) throws YarnException,
      IOException {
    // nm 节点信息 - ip、端口、资源、版本
    NodeId nodeId = request.getNodeId();
    String host = nodeId.getHost();
    int cmPort = nodeId.getPort();
    int httpPort = request.getHttpPort();
    Resource capability = request.getResource();
    String nodeManagerVersion = request.getNMVersion();

    RegisterNodeManagerResponse response = recordFactory
        .newRecordInstance(RegisterNodeManagerResponse.class);

    // 检查版本
    if (!minimumNodeManagerVersion.equals("NONE")) {
      if (minimumNodeManagerVersion.equals("EqualToRM")) {
        minimumNodeManagerVersion = YarnVersionInfo.getVersion();
      }

      if ((nodeManagerVersion == null) ||
          (VersionUtil.compareVersions(nodeManagerVersion,minimumNodeManagerVersion)) < 0) {
        String message =
            "Disallowed NodeManager Version " + nodeManagerVersion
                + ", is less than the minimum version "
                + minimumNodeManagerVersion + " sending SHUTDOWN signal to "
                + "NodeManager.";
        LOG.info(message);
        response.setDiagnosticsMessage(message);
        response.setNodeAction(NodeAction.SHUTDOWN);
        return response;
      }
    }
  • 处理心跳
    • 可以到源码中查看具体逻辑 ResourceTracker#nodeHeart
    • 接收并检查 nm 汇报的心跳信息
    • NodeHeartbeatResponse 中 set 需要释放的 Container 列表、Application 列表等信息
    • 向 RMNode 发送该 NodeManager 的状态信息并且保存最近一次心跳应答信息
    • 返回 `NodeHeartbeatResponse

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK