4.4 Stage 级别任务调度源码分析

SparkContext初始化

任务调度的时候, 需要用到 3 个非常重要的组件, 都是在 SparkContext 初始化的时候创建并启动:

这三个组件分别是:

SchedulerBackend TaskScheduler DAGScheduler

// 用来与其他组件通讯用
private var _schedulerBackend: SchedulerBackend = _
// DAG 调度器, 是调度系统的中的中的重要组件之一, 负责创建 Job, 将 DAG 中的 RDD 划分到不同的 Stage, 提交 Stage 等.
// SparkUI 中有关 Job 和 Stage 的监控数据都来自 DAGScheduler
@volatile private var _dagScheduler: DAGScheduler = _
// TaskScheduler 按照调度算法对集群管理器已经分配给应用程序的资源进行二次调度后分配给任务
// TaskScheduler 调度的 Task 是由DAGScheduler 创建的, 所以 DAGScheduler 是 TaskScheduler的前置调度器
private var _taskScheduler: TaskScheduler = _

// 创建 SchedulerBackend 和 TaskScheduler
val (sched, ts):(SchedulerBackend, TaskScheduler) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
// 创建 DAGScheduler
_dagScheduler = new DAGScheduler(this)
// 启动 TaskScheduler, 内部会也会启动 SchedulerBackend
_taskScheduler.start()

我们从一个 action 开始. 例如: collect

RDD 类源码分析

def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
}

sc.runJob 方法

def runJob[T, U: ClassTag](
                              rdd: RDD[T],
                              func: (TaskContext, Iterator[T]) => U,
                              partitions: Seq[Int],
                              resultHandler: (Int, U) => Unit): Unit = {
    // 作业的切分                          
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
}

DAGScheduler类源码分析

dagScheduler.runJob 方法


def runJob[T, U](
                    rdd: RDD[T],
                    func: (TaskContext, Iterator[T]) => U,
                    partitions: Seq[Int],
                    callSite: CallSite,
                    resultHandler: (Int, U) => Unit,
                    properties: Properties): Unit = {
    // 提交任务  返回值 JobWaiter 用来确定 Job 的成功与失败
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
}

dagScheduler.submitJob 方法

def submitJob[T, U](
                       rdd: RDD[T],
                       func: (TaskContext, Iterator[T]) => U,
                       partitions: Seq[Int],
                       callSite: CallSite,
                       resultHandler: (Int, U) => Unit,
                       properties: Properties): JobWaiter[U] = {
    // 创建 JobWaiter 对象
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    // 向内部事件循环器发送消息 JobSubmitted
    eventProcessLoop.post(JobSubmitted(
        jobId, rdd, func2, partitions.toArray, callSite, waiter,
        SerializationUtils.clone(properties)))
    waiter
}

DAGSchedulerEventProcessLoop

DAGSchedulerEventProcessLoopDAGSheduler内部的事件循环处理器, 用于处理DAGSchedulerEvent类型的事件.

前面发送的是JobSubmitted类型的事件

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
        // 处理提交的 Job
        dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

}

DAGScheduler.handleJobSubmitted

private[scheduler] def handleJobSubmitted(jobId: Int,
                                          finalRDD: RDD[_],
                                          func: (TaskContext, Iterator[_]) => _,
                                          partitions: Array[Int],
                                          callSite: CallSite,
                                          listener: JobListener,
                                          properties: Properties) {

    var finalStage: ResultStage = null
    try {
        // New stage creation may throw an exception if, for example, jobs are run on a
        // HadoopRDD whose underlying HDFS files have been deleted.
        // Stage 的划分是从后向前推断的, 所以先创建最后的阶段
        finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {

    }

    submitStage(finalStage)
}

DAGScheduler.createResultStage() 方法

private def createResultStage(
                                 rdd: RDD[_],
                                 func: (TaskContext, Iterator[_]) => _,
                                 partitions: Array[Int],
                                 jobId: Int,
                                 callSite: CallSite): ResultStage = {
    // 1. 获取所有父 Stage 的列表
    val parents: List[Stage] = getOrCreateParentStages(rdd, jobId)
    // 2. 给 resultStage 生成一个 id
    val id = nextStageId.getAndIncrement()
    // 3. 创建 ResultStage
    val stage: ResultStage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    // 4. stageId 和 ResultStage 做映射
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
}

DAGScheduler.getOrCreateParentStages() 方法

private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    // 获取所有的 Shuffle 依赖(宽依赖)
    getShuffleDependencies(rdd).map { shuffleDep =>
        // 对每个 shuffle 依赖,  获取或者创建新的 Stage: ShuffleMapStage
        getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
}

说明:

  • 一共有两种Stage: ResultStageShuffleMapStage

DAGScheduler.getShuffleDependencies

// 得到所有宽依赖
private[scheduler] def getShuffleDependencies(
                                                 rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
    val parents = new HashSet[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new Stack[RDD[_]]
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
        val toVisit = waitingForVisit.pop()
        if (!visited(toVisit)) {
            visited += toVisit
            toVisit.dependencies.foreach {
                case shuffleDep: ShuffleDependency[_, _, _] =>
                    parents += shuffleDep
                case dependency =>
                    waitingForVisit.push(dependency.rdd)
            }
        }
    }
    parents
}

DAGScheduler.submitStage(finalStage) 方法

提交 finalStage

private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {

        if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
            // 获取所有的父 Stage
            val missing = getMissingParentStages(stage).sortBy(_.id)
            // 如果为空, 则提交这个 Stage
            if (missing.isEmpty) {
                submitMissingTasks(stage, jobId.get)
            } else { // 如果还有父 Stage , 则递归调用
                for (parent <- missing) {
                    submitStage(parent)
                }
                waitingStages += stage
            }
        }
    } else {

    }
}

说明:

  • 从前面的分析可以看到, 阶段划分是从后向前

  • 最前面的 Stage 先提交

DAGScheduler.submitMissingTasks 方法

private def submitMissingTasks(stage: Stage, jobId: Int) {

    // 任务划分. 每个分区创建一个 Task
    val tasks: Seq[Task[_]] = try {
        stage match {
            case stage: ShuffleMapStage =>
                partitionsToCompute.map { id =>
                    val locs = taskIdToLocations(id)
                    val part = stage.rdd.partitions(id)
                    new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
                        taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
                        Option(sc.applicationId), sc.applicationAttemptId)
                }

            case stage: ResultStage =>
                partitionsToCompute.map { id =>
                    val p: Int = stage.partitions(id)
                    val part = stage.rdd.partitions(p)
                    val locs = taskIdToLocations(id)
                    new ResultTask(stage.id, stage.latestInfo.attemptId,
                        taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
                        Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
                }
        }
    } catch {

    }
    // 提交任务
    if (tasks.size > 0) {

        // 使用 taskScheduler 提交任务
        taskScheduler.submitTasks(new TaskSet(
            tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
    } else {

    }
}

TaskScheduler类源码分析

TaskScheduler是一个Trait, 我们分析它的实现类: TaskSchedulerImpl

override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks

    this.synchronized {
        // 创建 TaskManger 对象. 用来追踪每个任务
        val manager: TaskSetManager = createTaskSetManager(taskSet, maxTaskFailures)
        val stage = taskSet.stageId

        // manage 和 TaskSet 交给合适的任务调度器来调度
        schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

    }
    // 跟 ExecutorBackend 通讯
    backend.reviveOffers()
}

CoarseGrainedSchedulerBackend.reviveOffers

override def reviveOffers() {
    // DriverEndpoint 给自己发信息: ReviveOffers
    driverEndpoint.send(ReviveOffers)
}

DriverEndpoint.receive 方法

private def makeOffers() {
    // 过滤出 Active 的Executor
    val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
    // 封装资源
    val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
    }.toIndexedSeq
    // 启动任务
    launchTasks(scheduler.resourceOffers(workOffers))
}

DriverEndpoint.launchTasks


private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
    for (task <- tasks.flatten) {
        // 序列化任务
        val serializedTask = ser.serialize(task)
        if (serializedTask.limit >= maxRpcMessageSize) {

        }
        else {

            val executorData = executorDataMap(task.executorId)
            executorData.freeCores -= scheduler.CPUS_PER_TASK

            // 发送任务到 Executor. CoarseGrainedExecutorBackend 会收到消息
            executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
    }
}

CoarseGrainedExecutorBackend 源码分析

override def receive: PartialFunction[Any, Unit] = {
    // 
    case LaunchTask(data) =>
        if (executor == null) {
            exitExecutor(1, "Received LaunchTask command but executor was null")
        } else {
            // 把要执行的任务反序列化
            val taskDesc = ser.deserialize[TaskDescription](data.value)
            // 启动任务开始执行
            executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
                taskDesc.name, taskDesc.serializedTask)
        }
}

至此, stage 级别的任务调度完成

Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-06-11 12:34:39

results matching ""

    No results matching ""