Job, Stage, Task

运算总是以RDD的action方法结束,action方法会启动DAGScheduler的handleJobSubmit方法

以DAGScheduler为中心,把RDD组织成有向无环图,从结果向前计算

Stage的组织

计算Stage

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      allowLocal: Boolean,
      callSite: CallSite,
      listener: JobListener,
      properties: Properties = null)
  {
  //Stage的shuffleDep是None, 则Stage中的isShuffleMap==false
  finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
private def newStage(
      rdd: RDD[_],
      numTasks: Int,
      shuffleDep: Option[ShuffleDependency[_, _, _]],
      jobId: Int,
      callSite: CallSite)
    : Stage =
  {
    val parentStages = getParentStages(rdd, jobId)  //在这里计算所有的stage
    val id = nextStageId.getAndIncrement()
    val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

results matching ""

    No results matching ""