DAGScheduler

Stage计算

在任务提交的使用从final RDD开始计算stage分解任务。
getParentStage里计算Stage,当Shuffle的时候划分一个新的Stage,并在registerShuffleDependencies里计算完所有的stage

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      allowLocal: Boolean,
      callSite: CallSite,
      listener: JobListener,
      properties: Properties = null)
  {
    var finalStage: Stage = null
    try {
      //从这里开始
      finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
    } catch {
    ...

    private def newStage(
      rdd: RDD[_],
      numTasks: Int,
      shuffleDep: Option[ShuffleDependency[_, _, _]],
      jobId: Int,
      callSite: CallSite)
    : Stage =
  {
    //划分并找到父stage
    val parentStages = getParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }


  private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
    val parents = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new Stack[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_, _, _] =>
            //如果是Shuffle dependency就是新的stage,再这里面把所有的stage划分完,并注册map outout
              parents += getShuffleMapStage(shufDep, jobId)
            case _ =>
              waitingForVisit.push(dep.rdd)
          }
        }
      }
    }
    waitingForVisit.push(rdd)
    while (!waitingForVisit.isEmpty) {
      visit(waitingForVisit.pop())
    }
    parents.toList
  }

  //new 一个stage并注册output loc
  private def newOrUsedStage(
      rdd: RDD[_],
      numTasks: Int,
      shuffleDep: ShuffleDependency[_, _, _],
      jobId: Int,
      callSite: CallSite)
    : Stage =
  {
    val stage = newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite)
    if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
      val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
      for (i <- 0 until locs.size) {
        stage.outputLocs(i) = Option(locs(i)).toList   // locs(i) will be null if missing
      }
      //!=null 就是这个partition已经计算过了?
      stage.numAvailableOutputs = locs.count(_ != null)
    } else {
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
    }
    stage
  }

Handle map task completion

当MapTask结束时

case smt: ShuffleMapTask =>

            val status = event.result.asInstanceOf[MapStatus]
            val execId = status.location.executorId
            logDebug("ShuffleMapTask finished on " + execId)

              //加到stage的output location里
              stage.addOutputLoc(smt.partitionId, status)

            if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) {  //等待这个stage里其他的task

              //这个stage已经结束
              markStageAsFinished(stage)

              logInfo("looking for newly runnable stages")
              logInfo("running: " + runningStages)
              logInfo("waiting: " + waitingStages)
              logInfo("failed: " + failedStages)

              if (stage.shuffleDep.isDefined) {

                //登记output location,并把stage的epoch推进
                mapOutputTracker.registerMapOutputs(
                  stage.shuffleDep.get.shuffleId,
                  stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
                  changeEpoch = true)
              }


                val newlyRunnable = new ArrayBuffer[Stage]
                //在等待执行的stage里找没有父stage的
                for (stage <- waitingStages if getMissingParentStages(stage) == Nil) {
                  newlyRunnable += stage
                }

                waitingStages --= newlyRunnable
                runningStages ++= newlyRunnable

                //按stage的序列号提交stage
                for {
                  stage <- newlyRunnable.sortBy(_.id)
                  jobId <- activeJobForStage(stage)
                } {
                  logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable")
                  submitMissingTasks(stage, jobId)
                }
              }

          }

task submit

/** Submits stage, but first recursively submits any missing parents. */
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")

      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {

        //找这个stage的没有父的父stage(父stage结束后本stage即被认为没有父),并按id排序
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing == Nil) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          //如果没有没有父的父stage则提交本stage
          submitMissingTasks(stage, jobId.get)

        } else {

          for (parent <- missing) {
            //提交没有父的父stage
            submitStage(parent)
          }

          waitingStages += stage
        }
      }
    }
  }


  private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")

    // First figure out the indexes of partition ids to compute.
    val partitionsToCompute: Seq[Int] = {
      if (stage.isShuffleMap) {
        //找到没有output location的partition
        (0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)
      } else {
        val job = stage.resultOfJob.get
        (0 until job.numPartitions).filter(id => !job.finished(id))
      }
    }

    runningStages += stage

    var taskBinary: Broadcast[Array[Byte]] = null
    ...

    val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {

      //根据partition把stage分解成多个map task
      partitionsToCompute.map { id =>
        val locs = getPreferredLocs(stage.rdd, id)
        val part = stage.rdd.partitions(id)
        //生成ShuffleMapTask
        new ShuffleMapTask(stage.id, taskBinary, part, locs)
      }

    } else {
      val job = stage.resultOfJob.get
      partitionsToCompute.map { id =>
        val p: Int = job.partitions(id)
        val part = stage.rdd.partitions(p)
        val locs = getPreferredLocs(stage.rdd, p)
        //生成ResultTask
        new ResultTask(stage.id, taskBinary, part, locs, id)
      }
    }

    if (tasks.size > 0) {
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingTasks ++= tasks
      logDebug("New pending tasks: " + stage.pendingTasks)
      taskScheduler.submitTasks(
        new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTime())
    }
  }

TaskSchedulerImpl.submitTasks()

  override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
      activeTaskSets(taskSet.id) = manager
      //把任务加到任务队列中去,队列有2种形式,有FIFO和Fair
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      //每15秒检查1次任务有没有被接受
      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient memory")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
      }
      hasReceivedTask = true
    }

    //让backend执行任务
    backend.reviveOffers()
  }

console

16/01/26 09:54:35 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[3] at map at WordCount.scala:23)
16/01/26 09:54:35 DEBUG DAGScheduler: New pending tasks: Set(ShuffleMapTask(0, 0), ShuffleMapTask(0, 1))
16/01/26 09:54:35 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks

task execution in CoarseGrainedSchedulerBackend.scala

override def reviveOffers() {
    driverActor ! ReviveOffers
  }

override def start() {
    driverActor = actorSystem.actorOf(
      Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
  }
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive {

    override def preStart() {
      // Periodically revive offers to allow delay scheduling to work
      //1秒钟查一次有没有任务
      val reviveInterval = conf.getLong("spark.scheduler.revive.interval", 1000)
      import context.dispatcher
      context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
    }

    def receiveWithLogging = {
      case ReviveOffers =>
        makeOffers()
    def makeOffers() {
    //executorDataMap是根据executor生成的数据,然后在scheduler里根据executor和任务队列来生成可以启动的task seq
      launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq))
    }


    def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
        val executorData = executorDataMap(task.executorId)
        executorData.freeCores -= scheduler.CPUS_PER_TASK
        //向executor发消息启动任务
        executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
    }

在resourceOffers里对应任务队列里的任务和executor

executorDataMap init in CoarseGrainedSchedulerBackend

  def receiveWithLogging = {
      case RegisterExecutor(executorId, hostPort, cores) =>
          logInfo("Registered executor: " + sender + " with ID " + executorId)
          sender ! RegisteredExecutor

          addressToExecutorId(sender.path.address) = executorId
          totalCoreCount.addAndGet(cores)
          totalRegisteredExecutors.addAndGet(1)
          val (host, _) = Utils.parseHostPort(hostPort)
          val data = new ExecutorData(sender, sender.path.address, host, cores, cores)
          CoarseGrainedSchedulerBackend.this.synchronized {
          //注册executor
            executorDataMap.put(executorId, data)
          }
6/02/15 16:20:31 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://[email protected]:36756/user/Executor#-1681212329] with ID 0

上面SparkDeploySchedulerBackend extends SchedulerBackend由SparkContext初始化

BackendExecutor启动以后会向driver注册这个Executor,这里的driver就是上面的DriverActor CoarseGrainedExecutorBackend.scala

override def preStart() {
    logInfo("Connecting to driver: " + driverUrl)
    driver = context.actorSelection(driverUrl)
    driver ! RegisterExecutor(executorId, hostPort, cores)
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
  }

results matching ""

    No results matching ""