4.2 Spark Stage 级别调度
Spark
的任务调度是从DAG
切割开始,主要是由DAGScheduler
来完成。当遇到一个Action
操作后就会触发一个Job
的计算,并交给DAGScheduler
来提交,下图是涉及到Job
提交的相关方法调用流程图。
Job 由最终的
RDD
和Action
方法封装而成;SparkContext
将Job
交给DAGScheduler
提交,它会根据RDD
的血缘关系构成的DAG
进行切分,将一个Job
划分为若干Stages
,具体划分策略是,由最终的RDD
不断通过依赖回溯判断父依赖是否是宽依赖,即以Shuffle
为界,划分Stage
,窄依赖的RDD
之间被划分到同一个Stage
中,可以进行pipeline
式的计算。划分的
Stages
分两类,一类叫做ResultStage
,为DAG
最下游的Stage
,由Action
方法决定,另一类叫做ShuffleMapStage
,为下游Stage
准备数据
下面看一个简单的例子
WordCount
。
说明:
Job
由saveAsTextFile
触发,该Job
由RDD-3
和saveAsTextFile
方法组成,根据RDD
之间的依赖关系从RDD-3
开始回溯搜索,直到没有依赖的RDD-0
,在回溯搜索过程中,
RDD-3
依赖RDD-2
,并且是宽依赖,所以在RDD-2
和RDD-3
之间划分Stage
,RDD-3
被划到最后一个Stage
,即ResultStage
中RDD-2
依赖RDD-1
,RDD-1
依赖RDD-0
,这些依赖都是窄依赖,所以将RDD-0
、RDD-1
和RDD-2
划分到同一个Stage
,即ShuffleMapStage
中,实际执行的时候,数据记录会一气呵成地执行RDD-0
到RDD-2
的转化。不难看出,其本质上是一个深度优先搜索算法。
一个Stage
是否被提交,需要判断它的父Stage
是否执行,只有在父Stage
执行完毕才能提交当前Stage
如果一个Stage
没有父Stage
,那么从该Stage
开始提交。
Stage
提交时会将Task
信息(分区信息以及方法等)序列化并被打包成TaskSet
交给TaskScheduler
,一个Partition
对应一个Task
,另一方面TaskScheduler
会监控Stage
的运行状态,只有Executor
丢失或者Task
由于Fetch
失败才需要重新提交失败的Stage
以调度运行失败的任务,其他类型的Task
失败会在TaskScheduler
的调度过程中重试。
相对来说DAGScheduler
做的事情较为简单,仅仅是在Stage
层面上划分DAG
,提交Stage
并监控相关状态信息。
TaskScheduler
则相对较为复杂,下面详细阐述其细节。