4.4 Stage 级别任务调度源码分析
SparkContext
初始化
任务调度的时候, 需要用到 3 个非常重要的组件, 都是在 SparkContext
初始化的时候创建并启动:
这三个组件分别是:
SchedulerBackend
TaskScheduler
DAGScheduler
// 用来与其他组件通讯用
private var _schedulerBackend: SchedulerBackend = _
// DAG 调度器, 是调度系统的中的中的重要组件之一, 负责创建 Job, 将 DAG 中的 RDD 划分到不同的 Stage, 提交 Stage 等.
// SparkUI 中有关 Job 和 Stage 的监控数据都来自 DAGScheduler
@volatile private var _dagScheduler: DAGScheduler = _
// TaskScheduler 按照调度算法对集群管理器已经分配给应用程序的资源进行二次调度后分配给任务
// TaskScheduler 调度的 Task 是由DAGScheduler 创建的, 所以 DAGScheduler 是 TaskScheduler的前置调度器
private var _taskScheduler: TaskScheduler = _
// 创建 SchedulerBackend 和 TaskScheduler
val (sched, ts):(SchedulerBackend, TaskScheduler) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
// 创建 DAGScheduler
_dagScheduler = new DAGScheduler(this)
// 启动 TaskScheduler, 内部会也会启动 SchedulerBackend
_taskScheduler.start()
我们从一个
action
开始. 例如:collect
RDD
类源码分析
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
sc.runJob
方法
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
// 作业的切分
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
}
DAGScheduler
类源码分析
dagScheduler.runJob
方法
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
// 提交任务 返回值 JobWaiter 用来确定 Job 的成功与失败
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
}
dagScheduler.submitJob
方法
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// 创建 JobWaiter 对象
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
// 向内部事件循环器发送消息 JobSubmitted
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
DAGSchedulerEventProcessLoop
类
DAGSchedulerEventProcessLoop
是 DAGSheduler
内部的事件循环处理器, 用于处理DAGSchedulerEvent
类型的事件.
前面发送的是JobSubmitted
类型的事件
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
// 处理提交的 Job
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
}
DAGScheduler.handleJobSubmitted
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
// Stage 的划分是从后向前推断的, 所以先创建最后的阶段
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
}
submitStage(finalStage)
}
DAGScheduler.createResultStage()
方法
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
// 1. 获取所有父 Stage 的列表
val parents: List[Stage] = getOrCreateParentStages(rdd, jobId)
// 2. 给 resultStage 生成一个 id
val id = nextStageId.getAndIncrement()
// 3. 创建 ResultStage
val stage: ResultStage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
// 4. stageId 和 ResultStage 做映射
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
DAGScheduler.getOrCreateParentStages()
方法
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
// 获取所有的 Shuffle 依赖(宽依赖)
getShuffleDependencies(rdd).map { shuffleDep =>
// 对每个 shuffle 依赖, 获取或者创建新的 Stage: ShuffleMapStage
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
说明:
- 一共有两种
Stage
:ResultStage
和ShuffleMapStage
DAGScheduler.getShuffleDependencies
// 得到所有宽依赖
private[scheduler] def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}
DAGScheduler.submitStage(finalStage)
方法
提交 finalStage
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
// 获取所有的父 Stage
val missing = getMissingParentStages(stage).sortBy(_.id)
// 如果为空, 则提交这个 Stage
if (missing.isEmpty) {
submitMissingTasks(stage, jobId.get)
} else { // 如果还有父 Stage , 则递归调用
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
}
}
说明:
从前面的分析可以看到, 阶段划分是从后向前
最前面的 Stage 先提交
DAGScheduler.submitMissingTasks
方法
private def submitMissingTasks(stage: Stage, jobId: Int) {
// 任务划分. 每个分区创建一个 Task
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
} catch {
}
// 提交任务
if (tasks.size > 0) {
// 使用 taskScheduler 提交任务
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
} else {
}
}
TaskScheduler
类源码分析
TaskScheduler
是一个Trait
, 我们分析它的实现类: TaskSchedulerImpl
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
this.synchronized {
// 创建 TaskManger 对象. 用来追踪每个任务
val manager: TaskSetManager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
// manage 和 TaskSet 交给合适的任务调度器来调度
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
}
// 跟 ExecutorBackend 通讯
backend.reviveOffers()
}
CoarseGrainedSchedulerBackend.reviveOffers
override def reviveOffers() {
// DriverEndpoint 给自己发信息: ReviveOffers
driverEndpoint.send(ReviveOffers)
}
DriverEndpoint.receive
方法
private def makeOffers() {
// 过滤出 Active 的Executor
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
// 封装资源
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toIndexedSeq
// 启动任务
launchTasks(scheduler.resourceOffers(workOffers))
}
DriverEndpoint.launchTasks
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
// 序列化任务
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= maxRpcMessageSize) {
}
else {
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
// 发送任务到 Executor. CoarseGrainedExecutorBackend 会收到消息
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}
CoarseGrainedExecutorBackend
源码分析
override def receive: PartialFunction[Any, Unit] = {
//
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
// 把要执行的任务反序列化
val taskDesc = ser.deserialize[TaskDescription](data.value)
// 启动任务开始执行
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
}