30

Flink源码解析 | 从Example出发:读懂Flink On Yarn任务执行流程

 5 years ago
source link: https://mp.weixin.qq.com/s/PB2BhEyjD1cyBtfPj4IW1g
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
640?wx_fmt=png

微信公众号:深广大数据Club
关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;
如果你觉得深广大数据Club对你有帮助,欢迎赞赏

本文主要讲述Apache Flink在On Yarn模式下提交任务的执行流程源码分析。

关于本地模式以及集群模式,请阅读以下两篇文章:

Flink源码解析 | 从Example出发:读懂本地任务执行流程

Flink源码解析 | 从Example出发:读懂集群任务执行流程

环境部署脚本入口

640?wx_fmt=jpeg

在yarn集群上启动一个长时间运行的flink集群,通过脚本yarn-session.sh来启动。

./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m

我们从yarn-session.sh脚本入手,先来看下脚本的内容。

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

# get Flink config
. "$bin"/config.sh

if [ "$FLINK_IDENT_STRING" = "" ]; then
        FLINK_IDENT_STRING="$USER"
fi

JVM_ARGS="$JVM_ARGS -Xmx512m"

CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`

log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log
log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml"

export FLINK_CONF_DIR

$JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"
  • 设置jvm参数

  • 设置log配置

  • 调用FlinkYarnSessionCli执行

脚本使用指南

Usage:
   Required
     -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)
   Optional
     -D <arg>                        Dynamic properties
     -d,--detached                   Start detached
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -nm,--name                      Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for HA mode

FlinkYarnSessionCli.java

入口类:org.apache.flink.yarn.cli.FlinkYarnSessionCli.java
main方法中调用run方法执行on yarn的部署。

# main
...
final FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
    flinkConfiguration,
    configurationDirectory,
    "",
    ""); // no prefix for the YARN session

SecurityUtils.install(new SecurityConfiguration(flinkConfiguration));

retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
...

run方法中,第一步是获取参数进行参数解析

final CommandLine cmd = parseCommandLineOptions(args, true);

命令行解析完毕后有限匹配是否为help模式。如果是直接打印帮助信息。

if (cmd.hasOption(help.getOpt())) {
    printUsage();
    return 0;
}

判断是否包含-q的操作,则调用yarnClusterDescriptor.getClusterDescription()打印yarn的资源信息后退出

if (cmd.hasOption(query.getOpt())) {
    final String description = yarnClusterDescriptor.getClusterDescription();
    System.out.println(description);
    return 0;
}

如果不是-q或者-h这类操作,则进入主流程。
主流程开始先判断是否包含applicationId操作。
该操作命令用法如下:

Usage:
   Required
     -id,--applicationId <yarnAppId> YARN application Id

# 附加到正在运行的Flink YARN会话application_1463870264508_0029
   # 示例:./bin/yarn-session.sh -id application_1463870264508_0029

传入applicationId,通过yarnClusterDescriptor进行retrieve(检索)获取clusterClient对象。

if (cmd.hasOption(applicationId.getOpt())) {
    yarnApplicationId = ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt()));

clusterClient = yarnClusterDescriptor.retrieve(yarnApplicationId);
}

若不包含applicationId,则调用deploySessionCluster进行部署。

final ClusterSpecification clusterSpecification = getClusterSpecification(cmd);
clusterClient = yarnClusterDescriptor.deploySessionCluster(clusterSpecification);

//------------------ ClusterClient deployed, handle connection details
yarnApplicationId = clusterClient.getClusterId();

try {
    final LeaderConnectionInfo connectionInfo = clusterClient.getClusterConnectionInfo();

System.out.println("Flink JobManager is now running on " + connectionInfo.getHostname() +':' + connectionInfo.getPort() + " with leader id " + connectionInfo.getLeaderSessionID() + '.');
    System.out.println("JobManager Web Interface: " + clusterClient.getWebInterfaceURL());

writeYarnPropertiesFile(
        yarnApplicationId,
        clusterSpecification.getNumberTaskManagers() * clusterSpecification.getSlotsPerTaskManager(),
        yarnClusterDescriptor.getDynamicPropertiesEncoded());
} catch (Exception e) {
    try {
        clusterClient.shutdown();
    } catch (Exception ex) {
        LOG.info("Could not properly shutdown cluster client.", ex);
    }

try {
        yarnClusterDescriptor.killCluster(yarnApplicationId);
    } catch (FlinkException fe) {
        LOG.info("Could not properly terminate the Flink cluster.", fe);
    }
    throw new FlinkException("Could not write the Yarn connection information.", e);
}

包含如下步骤

  • deploySessionCluster部署clusterClient

  • 获取ApplicationId

  • 通过clusterClient 获取LeaderConnectionInfo

  • 写入yarn属性信息

on yarn部署还涉及到客户端是否分离的问题,yarn-sesion.sh脚本中指定-d--detached,可以启动分离的YarnSession,而不需要客户端一直运行。

if (yarnClusterDescriptor.isDetachedMode()) {
    LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + "yarn application -kill " + yarnApplicationId);
}

注:在这种情况下,Flink YARN客户端将仅向群集提交Flink,然后自行关闭。请注意,在这种情况下,无法使用Flink停止YARN会话。
使用YARN实用程序(yarn application -kill)来停止YARN会话。

如不指定分离方式,客户端需要持续运行,可以通过ctrl+c或者stop来停止。
以下是非分离方式的代码内容:

ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

final YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
    yarnClusterDescriptor.getYarnClient(),
    yarnApplicationId,
    new ScheduledExecutorServiceAdapter(scheduledExecutorService));

try {
    runInteractiveCli(
        clusterClient,
        yarnApplicationStatusMonitor,
        acceptInteractiveInput);
 } finally {
    try {
        yarnApplicationStatusMonitor.close();
    } catch (Exception e) {
        LOG.info("Could not properly close the Yarn application status monitor.", e);
    }

clusterClient.shutDownCluster();

try {
        clusterClient.shutdown();
    } catch (Exception e) {
        LOG.info("Could not properly shutdown cluster client.", e);
    }

// shut down the scheduled executor service
    ExecutorUtils.gracefulShutdown(
        1000L,
        TimeUnit.MILLISECONDS,
        scheduledExecutorService);

deleteYarnPropertiesFile();

ApplicationReport applicationReport;

try {
        applicationReport = yarnClusterDescriptor
            .getYarnClient()
            .getApplicationReport(yarnApplicationId);
    } catch (YarnException | IOException e) {
        LOG.info("Could not log the final application report.", e);
        applicationReport = null;
    }

if (applicationReport != null) {
        logFinalApplicationReport(applicationReport);
    }
}
  • 创建scheduledExecutorService

  • 创建状态监听器yarnApplicationStatusMonitor

  • 运行交互式客户端runInteractiveCli

  • 一系列关闭操作。

  • 生成Application报表

runInteractiveCli方法

while (continueRepl) {
    final ApplicationStatus applicationStatus = yarnApplicationStatusMonitor.getApplicationStatusNow();

switch (applicationStatus) {
        case FAILED:
        case CANCELED:
            System.err.println("The Flink Yarn cluster has failed.");
            continueRepl = false;
            break;
        case UNKNOWN:
            if (!isLastStatusUnknown) {
                unknownStatusSince = System.nanoTime();
                isLastStatusUnknown = true;
            }

if ((System.nanoTime() - unknownStatusSince) > 5L * CLIENT_POLLING_INTERVAL_MS * 1_000_000L) {
                System.err.println("The Flink Yarn cluster is in an unknown state. Please check the Yarn cluster.");
                continueRepl = false;
            } else {
                continueRepl = repStep(in, readConsoleInput);
            }
            break;
        case SUCCEEDED:
            if (isLastStatusUnknown) {
                isLastStatusUnknown = false;
            }

// ------------------ check if there are updates by the cluster -----------
            try {
                final GetClusterStatusResponse status = clusterClient.getClusterStatus();

if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) {
                    System.err.println("Number of connected TaskManagers changed to " +
                    status.numRegisteredTaskManagers() + ". " +
                    "Slots available: " + status.totalNumberOfSlots());
                    numTaskmanagers = status.numRegisteredTaskManagers();
                 }
            } catch (Exception e) {
                LOG.warn("Could not retrieve the current cluster status. Skipping current retrieval attempt ...", e);
            }

printClusterMessages(clusterClient);

continueRepl = repStep(in, readConsoleInput);
    }
}

方法内部主要是一个while循环体,判断条件是continueRepl,循环体内部逻辑

  • 通过yarnApplicationStatusMonitor.getApplicationStatusNow()获取ApplicationStatus状态信息

  • switch判断ApplicationStatus是FAILED、CANCELED、UNKNOWN还是SUCCEEDED

  • FAILED、CANCELED continueRepl为false,跳出循环

  • UNKNOWN 会对比当前的时间与最后unknown的时间,如果大于5L*CLIENT_POLLING_INTERVAL_MS * 1_000_000L,则continueRepl为false,跳出循环;否则继续循环

  • SUCCEEDED

    • 获取集群状态信息

    • 验证集群所注册的TaskManager数量与所指定的数量是否相符,不相符则打印err日志

    • 打印集群消息

    • 调用repStep方法获取continueRepl值

repStep方法主要是用于交互式方式接受用户输入:quit,stop,help。

private static boolean repStep(
    BufferedReader in,
    boolean readConsoleInput) throws IOException, InterruptedException {

// wait until CLIENT_POLLING_INTERVAL is over or the user entered something.
    long startTime = System.currentTimeMillis();
    while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVAL_MS
        && (!readConsoleInput || !in.ready())) {
        Thread.sleep(200L);
    }
    //------------- handle interactive command by user. ----------------------

if (readConsoleInput && in.ready()) {
        String command = in.readLine();
        switch (command) {
            case "quit":
            case "stop":
                return false;

case "help":
                System.err.println(YARN_SESSION_HELP);
                break;
            default:
                System.err.println("Unknown command '" + command + "'. Showing help:");
                System.err.println(YARN_SESSION_HELP);
                break;
        }
    }

return true;
}

用户通过客户端输入quitstop,返回false;其他则返回true

run()方法最后执行yarnClusterDescriptor.close

yarnClusterDescriptor.close();

外层的流程讲完了,我们来看下部署YarnSession集群的过程。

YarnSession部署

640?wx_fmt=jpeg

部署通过AbstractYarnClusterDescriptor.deploySessionCluster方法,调用deployInternal()执行部署。

@Override
public ClusterClient<ApplicationId> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
    try {
        return deployInternal(
            clusterSpecification,
            "Flink session cluster",
            getYarnSessionClusterEntrypoint(),
            null,
            false);
    } catch (Exception e) {
        throw new ClusterDeploymentException("Couldn't deploy Yarn session cluster", e);
    }
}

deployInternal()方法将阻塞,直到将ApplicationMaster/JobManager部署到纱线上为止。

由于代码块较长,这里我们做代码拆分展示分析。

1、配置验证

validateClusterSpecification(clusterSpecification);

if (UserGroupInformation.isSecurityEnabled()) {
    // note: UGI::hasKerberosCredentials inaccurately reports false
    // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
    // so we check only in ticket cache scenario.
    boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);

UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
    if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS
    && useTicketCache && !loginUser.hasKerberosCredentials()) {
        LOG.error("Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials");
        throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " +
        "does not have Kerberos credentials");
    }
}

isReadyForDeployment(clusterSpecification);

validateClusterSpecification方法主要是读取taskManagerMemorySize以及计算cutoff

private void validateClusterSpecification(ClusterSpecification clusterSpecification) throws FlinkException {
        try {
            final long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB();
            // We do the validation by calling the calculation methods here
            // Internally these methods will check whether the cluster can be started with the provided
            // ClusterSpecification and the configured memory requirements
            final long cutoff = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize);
            TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, flinkConfiguration);
        } catch (IllegalArgumentException iae) {
            throw new FlinkException("Cannot fulfill the minimum memory requirements with the provided " +
                "cluster specification. Please increase the memory of the cluster.", iae);
        }
    }

之后则是验证是否启动Security,进行安全验证,最后判断是否可执行部署。

2、检查指定的yarn queue

// ------------------ Check if the specified queue exists --------------------
checkYarnQueues(yarnClient);

3、读取并设置动态属性

// ------------------ Add dynamic properties to local flinkConfiguraton ------
Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
    flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
}

4、检查yarn集群是否能满足资源请求.

  • 创建YarnClientApplication 以及 GetNewApplicationResponse

// Create application via yarnClient
final YarnClientApplication yarnApplication = yarnClient.createApplication();
final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
  • 验证集群资源

Resource maxRes = appResponse.getMaximumResourceCapability();

final ClusterResourceDescription freeClusterMem;
try {
    freeClusterMem = getCurrentFreeClusterResources(yarnClient);
} catch (YarnException | IOException e) {
    failSessionDuringDeployment(yarnClient, yarnApplication);
    throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
}

final int yarnMinAllocationMB = yarnConfiguration.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);

final ClusterSpecification validClusterSpecification;
try {
    validClusterSpecification = validateClusterResources(
        clusterSpecification,
        yarnMinAllocationMB,
        maxRes,
        freeClusterMem);
} catch (YarnDeploymentException yde) {
    failSessionDuringDeployment(yarnClient, yarnApplication);
    throw yde;
}

LOG.info("Cluster specification: {}", validClusterSpecification);
  • 获取ExecutionMode,执行startAppMaster并返回YarnClusterClient对象

final ClusterEntrypoint.ExecutionMode executionMode = detached ?
    ClusterEntrypoint.ExecutionMode.DETACHED
    : ClusterEntrypoint.ExecutionMode.NORMAL;

flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString());

ApplicationReport report = startAppMaster(
    flinkConfiguration,
    applicationName,
    yarnClusterEntrypoint,
    jobGraph,
    yarnClient,
    yarnApplication,
    validClusterSpecification);

String host = report.getHost();
int port = report.getRpcPort();

// Correctly initialize the Flink config
flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
flinkConfiguration.setInteger(JobManagerOptions.PORT, port);

flinkConfiguration.setString(RestOptions.ADDRESS, host);
flinkConfiguration.setInteger(RestOptions.PORT, port);

// the Flink cluster is deployed in YARN. Represent cluster
return createYarnClusterClient(
    this,
    validClusterSpecification.getNumberTaskManagers(),
    validClusterSpecification.getSlotsPerTaskManager(),
    report,
    flinkConfiguration,
    true);

由于篇幅问题,至于startAppMaster这里就不再深入分析,有兴趣的朋友可以自行阅读。或者在后续文章中再做详细讲解。

任务执行入口

640?wx_fmt=jpeg

程序执行入口同样是$FLINK_HOME/bin/flink run,通过CliFrontend.run调用runProgram()来运行项目。

1、实例化ClusterDescriptor

ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);

此处的customCommandLine对象一个FlinkYarnSessionCli实例。

2、获取clusterId并结合DetachedMode判断处理逻辑

if (clusterId == null && runOptions.getDetachedMode()) {
    ...
    final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);

final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
    client = clusterDescriptor.deployJobCluster(
        clusterSpecification,
        jobGraph,
        runOptions.getDetachedMode());
    ...
}else{
    if (clusterId != null) {
        client = clusterDescriptor.retrieve(clusterId);
        shutdownHook = null;
    } else {
        // also in job mode we have to deploy a session cluster because the job
        // might consist of multiple parts (e.g. when using collect)
        final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
        client = clusterDescriptor.deploySessionCluster(clusterSpecification);
        // if not running in detached mode, add a shutdown hook to shut down cluster if client exits
        // there's a race-condition here if cli is killed before shutdown hook is installed
        if (!runOptions.getDetachedMode() && runOptions.isShutdownOnAttachedExit()) {
            shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
        } else {
            shutdownHook = null;
        }
    }
    ...
}

第一个if中主要是用于DetachedMode(客户端分离模式)

  • 创建JobGraph

  • 获取ClusterSpecification

  • 部署任务集群deployJobCluster
    如果条件不成立则走else。
    第二个if中如果clusterId不为空,则通过clusterDescriptor.retrieve获取client对象。
    否则通过clusterDescriptor.deploySessionCluster部署,获取client对象

3、通过client对象执行任务

executeProgram(program, client, userParallelism);

4、执行获取JobSubmissionResult结果

protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException {
    logAndSysout("Starting execution of program");

final JobSubmissionResult result = client.run(program, parallelism);
    ....
}

5、client提交任务

经过多层run方法调用最终执行YarnClusterClient.submitJob()方法。

public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        if (isDetached()) {
            if (newlyCreatedCluster) {
                stopAfterJob(jobGraph.getJobID());
            }
            return super.runDetached(jobGraph, classLoader);
        } else {
            return super.run(jobGraph, classLoader);
        }
    }

之后的方法调用与前一篇文章《Flink源码解析 | 从Example出发:读懂集群任务执行流程》一致。需要进一步去阅读了解。

之后的文章准备讲下采用start-scala-shell.sh脚本执行的流程,各类Graph的生成以及actor系统。之后会将以上这些文章整理出一篇完整的Apache Flink的任务执行流程总结文章出来,尽请期待。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK