Job, Stage, Task
运算总是以RDD的action方法结束,action方法会启动DAGScheduler的handleJobSubmit方法
以DAGScheduler为中心,把RDD组织成有向无环图,从结果向前计算
计算Stage
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
allowLocal: Boolean,
callSite: CallSite,
listener: JobListener,
properties: Properties = null)
{
//Stage的shuffleDep是None, 则Stage中的isShuffleMap==false
finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
private def newStage(
rdd: RDD[_],
numTasks: Int,
shuffleDep: Option[ShuffleDependency[_, _, _]],
jobId: Int,
callSite: CallSite)
: Stage =
{
val parentStages = getParentStages(rdd, jobId) //在这里计算所有的stage
val id = nextStageId.getAndIncrement()
val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}