Yarn client 模式运行机制源码分析

执行下面的代码:

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100

启动类:

/opt/module/jdk1.8.0_172/bin/java 
-cp /opt/module/spark-yarn/conf/:/opt/module/spark-yarn/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/ 
-Xmx1g 
org.apache.spark.deploy.SparkSubmit 
--master yarn 
--deploy-mode client 
--class org.apache.spark.examples.SparkPi 
./examples/jars/spark-examples_2.11-2.1.1.jar 100

依次启动 3 个不同的进程:

SparkSubmit
ExecutorLauncher
CoarseGrainedExecutorBackend

client 模式下直接运行用户的主类:

prepareSubmitEnvironment 方法

/*
    client 模式下, 直接启动用户的主类
*/
if (deployMode == CLIENT || isYarnCluster) {
    // 如果是客户端模式, childMainClass 就是用户的类
    // 集群模式下, childMainClass 被重新赋值为 org.apache.spark.deploy.yarn.Client
    childMainClass = args.mainClass
}

然后不会创建ApplicationMaster, 而是直接执行用户类的main方法

然后开始实例化 SparkContext

实例化SparkContext

val (sched, ts):(SchedulerBackend, TaskScheduler) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
// 启动 YarnScheduler
_taskScheduler.start()

SparkContext.createTaskScheduler 方法

关键代码:

private def createTaskScheduler(
                                   sc: SparkContext,
                                   master: String,
                                   deployMode: String): (SchedulerBackend, TaskScheduler) = {
    import SparkMasterRegex._

    master match {

        case masterUrl =>
            // 得到的是 YarnClusterManager
            val cm = getClusterManager(masterUrl) match {
                case Some(clusterMgr) => clusterMgr
                case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
            }
            try {
                // 创建 YarnScheduler
                val scheduler: TaskScheduler = cm.createTaskScheduler(sc, masterUrl)
                // 创建 YarnClientSchedulerBackend
                val backend: SchedulerBackend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
                cm.initialize(scheduler, backend)
                (backend, scheduler)
            } catch {

            }
    }
}

YarnClusterManager

private[spark] class YarnClusterManager extends ExternalClusterManager {

    override def canCreate(masterURL: String): Boolean = {
        masterURL == "yarn"
    }

    override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
        sc.deployMode match {
            case "cluster" => new YarnClusterScheduler(sc)
            case "client" => new YarnScheduler(sc)
            case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
        }
    }

    override def createSchedulerBackend(sc: SparkContext,
                                        masterURL: String,
                                        scheduler: TaskScheduler): SchedulerBackend = {
        sc.deployMode match {
            case "cluster" =>
                new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
            case "client" =>
                new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
            case _ =>
                throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
        }
    }

    override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
        scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
    }
}

_taskScheduler.start()

YarnClientSchedulerBackendstart 方法


/**
  * Create a Yarn client to submit an application to the ResourceManager.
  * This waits until the application is running.
  *
  * 创建客户端, 提交应用给 ResourceManager
  * 会一直等到应用开始执行
  */
override def start() {
    val driverHost = conf.get("spark.driver.host")
    val driverPort = conf.get("spark.driver.port")
    val argsArrayBuf = new ArrayBuffer[String]()
    argsArrayBuf += ("--arg", hostport)
    val args = new ClientArguments(argsArrayBuf.toArray)
    client = new Client(args, conf)
    // 使用 Client 提交应用
    bindToYarn(client.submitApplication(), None)
    waitForApplication()
}

org.apache.spark.deploy.yarn.Client 源码再分析

submitApplication 方法

yarnClient.submitApplication(appContext)

ExecutorLauncher

yarnClient 提交应用的时候, 把要执行的主类(ExecutorLauncher)封装到配置中. 所以不是启动ApplicationMaster, 而是启动ExecutorLauncher

// createContainerLaunchContext()
val amClass =
    if (isClusterMode) {
        Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
    } else {
        Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
    }
/**
  * This object does not provide any special functionality. It exists so that it's easy to tell
  * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps.
  * 
  * 这个对象不提供任何特定的功能.
  * 
  * 它的存在使得在使用诸如ps或jps之类的工具时,很容易区分客户机模式AM和集群模式AM。
  * 
  */
object ExecutorLauncher {

    def main(args: Array[String]): Unit = {
        ApplicationMaster.main(args)
    }
}

ApplicationMaster 源码再分析

run 方法

final def run(): Int = {
    try {
        if (isClusterMode) {
            runDriver(securityMgr)
        } else {
            // 非集群模式, 直接执行 ExecutorLauncher, 而不在需要运行 Driver
            runExecutorLauncher(securityMgr)
        }
    } catch {

    }
    exitCode
}

runExecutorLauncher

private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
    val driverRef = waitForSparkDriver()
    addAmIpFilter()
    registerAM(sparkConf, rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""),
        securityMgr)

    // In client mode the actor will stop the reporter thread.
    reporterThread.join()
}

在以后的执行流程就和yarn-cluster模式一样了. 不再赘述

总结

Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-06-11 12:34:39

results matching ""

    No results matching ""