DAGScheduler
Stage计算
在任务提交的使用从final RDD开始计算stage分解任务。
getParentStage里计算Stage,当Shuffle的时候划分一个新的Stage,并在registerShuffleDependencies里计算完所有的stage

private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
allowLocal: Boolean,
callSite: CallSite,
listener: JobListener,
properties: Properties = null)
{
var finalStage: Stage = null
try {
//从这里开始
finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
} catch {
...
private def newStage(
rdd: RDD[_],
numTasks: Int,
shuffleDep: Option[ShuffleDependency[_, _, _]],
jobId: Int,
callSite: CallSite)
: Stage =
{
//划分并找到父stage
val parentStages = getParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
visited += r
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
//如果是Shuffle dependency就是新的stage,再这里面把所有的stage划分完,并注册map outout
parents += getShuffleMapStage(shufDep, jobId)
case _ =>
waitingForVisit.push(dep.rdd)
}
}
}
}
waitingForVisit.push(rdd)
while (!waitingForVisit.isEmpty) {
visit(waitingForVisit.pop())
}
parents.toList
}
//new 一个stage并注册output loc
private def newOrUsedStage(
rdd: RDD[_],
numTasks: Int,
shuffleDep: ShuffleDependency[_, _, _],
jobId: Int,
callSite: CallSite)
: Stage =
{
val stage = newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite)
if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
for (i <- 0 until locs.size) {
stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing
}
//!=null 就是这个partition已经计算过了?
stage.numAvailableOutputs = locs.count(_ != null)
} else {
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
}
stage
}
Handle map task completion
当MapTask结束时
case smt: ShuffleMapTask =>
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
//加到stage的output location里
stage.addOutputLoc(smt.partitionId, status)
if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) { //等待这个stage里其他的task
//这个stage已经结束
markStageAsFinished(stage)
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
logInfo("waiting: " + waitingStages)
logInfo("failed: " + failedStages)
if (stage.shuffleDep.isDefined) {
//登记output location,并把stage的epoch推进
mapOutputTracker.registerMapOutputs(
stage.shuffleDep.get.shuffleId,
stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
changeEpoch = true)
}
val newlyRunnable = new ArrayBuffer[Stage]
//在等待执行的stage里找没有父stage的
for (stage <- waitingStages if getMissingParentStages(stage) == Nil) {
newlyRunnable += stage
}
waitingStages --= newlyRunnable
runningStages ++= newlyRunnable
//按stage的序列号提交stage
for {
stage <- newlyRunnable.sortBy(_.id)
jobId <- activeJobForStage(stage)
} {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable")
submitMissingTasks(stage, jobId)
}
}
}
task submit
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
//找这个stage的没有父的父stage(父stage结束后本stage即被认为没有父),并按id排序
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing == Nil) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
//如果没有没有父的父stage则提交本stage
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
//提交没有父的父stage
submitStage(parent)
}
waitingStages += stage
}
}
}
}
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = {
if (stage.isShuffleMap) {
//找到没有output location的partition
(0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)
} else {
val job = stage.resultOfJob.get
(0 until job.numPartitions).filter(id => !job.finished(id))
}
}
runningStages += stage
var taskBinary: Broadcast[Array[Byte]] = null
...
val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {
//根据partition把stage分解成多个map task
partitionsToCompute.map { id =>
val locs = getPreferredLocs(stage.rdd, id)
val part = stage.rdd.partitions(id)
//生成ShuffleMapTask
new ShuffleMapTask(stage.id, taskBinary, part, locs)
}
} else {
val job = stage.resultOfJob.get
partitionsToCompute.map { id =>
val p: Int = job.partitions(id)
val part = stage.rdd.partitions(p)
val locs = getPreferredLocs(stage.rdd, p)
//生成ResultTask
new ResultTask(stage.id, taskBinary, part, locs, id)
}
}
if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingTasks ++= tasks
logDebug("New pending tasks: " + stage.pendingTasks)
taskScheduler.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTime())
}
}
TaskSchedulerImpl.submitTasks()
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
activeTaskSets(taskSet.id) = manager
//把任务加到任务队列中去,队列有2种形式,有FIFO和Fair
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
//每15秒检查1次任务有没有被接受
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient memory")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
}
hasReceivedTask = true
}
//让backend执行任务
backend.reviveOffers()
}
console
16/01/26 09:54:35 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[3] at map at WordCount.scala:23)
16/01/26 09:54:35 DEBUG DAGScheduler: New pending tasks: Set(ShuffleMapTask(0, 0), ShuffleMapTask(0, 1))
16/01/26 09:54:35 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
task execution in CoarseGrainedSchedulerBackend.scala
override def reviveOffers() {
driverActor ! ReviveOffers
}
override def start() {
driverActor = actorSystem.actorOf(
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive {
override def preStart() {
// Periodically revive offers to allow delay scheduling to work
//1秒钟查一次有没有任务
val reviveInterval = conf.getLong("spark.scheduler.revive.interval", 1000)
import context.dispatcher
context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
}
def receiveWithLogging = {
case ReviveOffers =>
makeOffers()
def makeOffers() {
//executorDataMap是根据executor生成的数据,然后在scheduler里根据executor和任务队列来生成可以启动的task seq
launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq))
}
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
//向executor发消息启动任务
executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
}
在resourceOffers里对应任务队列里的任务和executor
executorDataMap init in CoarseGrainedSchedulerBackend
def receiveWithLogging = {
case RegisterExecutor(executorId, hostPort, cores) =>
logInfo("Registered executor: " + sender + " with ID " + executorId)
sender ! RegisteredExecutor
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val (host, _) = Utils.parseHostPort(hostPort)
val data = new ExecutorData(sender, sender.path.address, host, cores, cores)
CoarseGrainedSchedulerBackend.this.synchronized {
//注册executor
executorDataMap.put(executorId, data)
}
6/02/15 16:20:31 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://[email protected]:36756/user/Executor#-1681212329] with ID 0
上面SparkDeploySchedulerBackend extends SchedulerBackend由SparkContext初始化
BackendExecutor启动以后会向driver注册这个Executor,这里的driver就是上面的DriverActor CoarseGrainedExecutorBackend.scala
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
driver ! RegisterExecutor(executorId, hostPort, cores)
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}