Yarn client 模式运行机制源码分析
执行下面的代码:
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100
启动类:
/opt/module/jdk1.8.0_172/bin/java
-cp /opt/module/spark-yarn/conf/:/opt/module/spark-yarn/jars/*:/opt/module/hadoop-2.7.2/etc/hadoop/
-Xmx1g
org.apache.spark.deploy.SparkSubmit
--master yarn
--deploy-mode client
--class org.apache.spark.examples.SparkPi
./examples/jars/spark-examples_2.11-2.1.1.jar 100
依次启动 3 个不同的进程:
SparkSubmit
ExecutorLauncher
CoarseGrainedExecutorBackend
client
模式下直接运行用户的主类:
prepareSubmitEnvironment
方法
/*
client 模式下, 直接启动用户的主类
*/
if (deployMode == CLIENT || isYarnCluster) {
// 如果是客户端模式, childMainClass 就是用户的类
// 集群模式下, childMainClass 被重新赋值为 org.apache.spark.deploy.yarn.Client
childMainClass = args.mainClass
}
然后不会创建ApplicationMaster
, 而是直接执行用户类的main
方法
然后开始实例化 SparkContext
实例化SparkContext
val (sched, ts):(SchedulerBackend, TaskScheduler) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
// 启动 YarnScheduler
_taskScheduler.start()
SparkContext.createTaskScheduler
方法
关键代码:
private def createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._
master match {
case masterUrl =>
// 得到的是 YarnClusterManager
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
// 创建 YarnScheduler
val scheduler: TaskScheduler = cm.createTaskScheduler(sc, masterUrl)
// 创建 YarnClientSchedulerBackend
val backend: SchedulerBackend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
}
}
}
YarnClusterManager
类
private[spark] class YarnClusterManager extends ExternalClusterManager {
override def canCreate(masterURL: String): Boolean = {
masterURL == "yarn"
}
override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
sc.deployMode match {
case "cluster" => new YarnClusterScheduler(sc)
case "client" => new YarnScheduler(sc)
case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
}
}
override def createSchedulerBackend(sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
sc.deployMode match {
case "cluster" =>
new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
case "client" =>
new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
case _ =>
throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
}
}
override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
}
}
_taskScheduler.start()
YarnClientSchedulerBackend
的 start
方法
/**
* Create a Yarn client to submit an application to the ResourceManager.
* This waits until the application is running.
*
* 创建客户端, 提交应用给 ResourceManager
* 会一直等到应用开始执行
*/
override def start() {
val driverHost = conf.get("spark.driver.host")
val driverPort = conf.get("spark.driver.port")
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += ("--arg", hostport)
val args = new ClientArguments(argsArrayBuf.toArray)
client = new Client(args, conf)
// 使用 Client 提交应用
bindToYarn(client.submitApplication(), None)
waitForApplication()
}
org.apache.spark.deploy.yarn.Client
源码再分析
submitApplication
方法
yarnClient.submitApplication(appContext)
ExecutorLauncher
类
yarnClient
提交应用的时候, 把要执行的主类(ExecutorLauncher
)封装到配置中. 所以不是启动ApplicationMaster
, 而是启动ExecutorLauncher
// createContainerLaunchContext()
val amClass =
if (isClusterMode) {
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
/**
* This object does not provide any special functionality. It exists so that it's easy to tell
* apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps.
*
* 这个对象不提供任何特定的功能.
*
* 它的存在使得在使用诸如ps或jps之类的工具时,很容易区分客户机模式AM和集群模式AM。
*
*/
object ExecutorLauncher {
def main(args: Array[String]): Unit = {
ApplicationMaster.main(args)
}
}
ApplicationMaster
源码再分析
run
方法
final def run(): Int = {
try {
if (isClusterMode) {
runDriver(securityMgr)
} else {
// 非集群模式, 直接执行 ExecutorLauncher, 而不在需要运行 Driver
runExecutorLauncher(securityMgr)
}
} catch {
}
exitCode
}
runExecutorLauncher
private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
val driverRef = waitForSparkDriver()
addAmIpFilter()
registerAM(sparkConf, rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""),
securityMgr)
// In client mode the actor will stop the reporter thread.
reporterThread.join()
}