4.5 Task 级别任务调度源码分析
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
根据前面的分析介绍, DAGSheduler
将Task
提交给TaskScheduler
时, 需要将多个 Task
打包为TaskSet
.
TaskSet
是整个调度池中对Task
进行调度管理的基本单位, 由调度池中的TaskManager
来管理.
taskScheduler.submitTasks
方法
// 把 TaskSet 交给任务调度池来调度
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
schedulableBuilder
的类型是:SchedulableBuilder
, 它是一个Trait
, 有两个已知的实现子类: FIFOSchedulableBuilder
和 FairSchedulableBuilder
SchedulableBuilder(调度池构建器)
1. FIFOSchedulableBuilder
FIFOSchedulableBuilder.addTaskSetManager
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
// 对 FIFO 调度, 则直接交给根调度器来调度
// 因为 FIFO 调度只有一个根调度度池
rootPool.addSchedulable(manager)
}
说明:
rootPool
是根调度池, 它的类型是Pool
, 表示Poll
或TaskSet
的可调度实体.FIFO
调度是默认调度算法spark.scheduler.mode
类设置调度算法:FIFO
,FAIR
根调度池是在初始化
TaskSchedulerImpl
的时候创建的.FIFOSchedulableBuilder
不需要再构建新的子调度池, 只需要有rootPoll
就可以了override def buildPools() { // nothing }
2. FairSchedulableBuilder
不仅仅需要根调度池, 还需要创建更多的调度池
FairSchedulableBuilder.buildPools
方法内会创建更多的子调度池.
SchedulingAlgorithm
(调度算法)
/**
* An interface for sort algorithm
* 用于排序算法的接口
* FIFO: FIFO algorithm between TaskSetManagers
* FIFO: TaskSetManager 之间的排序
*
* FS: FS algorithm between Pools, and FIFO or FS within Pools
* FS: 池之间排序
*/
private[spark] trait SchedulingAlgorithm {
def comparator(s1: Schedulable, s2: Schedulable): Boolean
}
1. FIFOSchedulingAlgorithm
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
// 是不是先调度 s1
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority
val priority2 = s2.priority
var res = math.signum(priority1 - priority2)
if (res == 0) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
}
res < 0 // 值小的先调度
}
}
2. FairSchedulingAlgorithm
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val minShare1 = s1.minShare
val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var compare = 0
if (s1Needy && !s2Needy) { // 谁的 runningTasks1 < minShare1 谁先被调度
return true
} else if (!s1Needy && s2Needy) {
return false
} else if (s1Needy && s2Needy) { // 如果都 runningTasks < minShare
// 则比较 runningTasks / math.max(minShare1, 1.0) 的比值 小的优先级高
compare = minShareRatio1.compareTo(minShareRatio2)
} else {
// 如果都runningTasks > minShare, 则比较 runningTasks / weight 的比值
// 小的优先级高
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
}
if (compare < 0) {
true
} else if (compare > 0) {
false
} else {
// 如果前面都一样, 则比较 TaskSetManager 或 Pool 的名字
s1.name < s2.name
}
}
}