Worker

private[spark] class Worker(...) extends Actor with ActorLogReceive with Logging {

1注册到master

override def preStart() {
  assert(!registered)
...
  registerWithMaster()
}
private def tryRegisterAllMasters() {
  for (masterUrl <- masterUrls) {
    logInfo("Connecting to master " + masterUrl + "...")
    val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
    actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
  }
}

2定期发送HEARTBEAT

override def receiveWithLogging = {
  case RegisteredWorker(masterUrl, masterWebUiUrl) =>
     ...
    changeMaster(masterUrl, masterWebUiUrl)
    context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)

case SendHeartbeat =>
  if (connected) { master ! Heartbeat(workerId) }

...
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))

val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4

缺省25毫秒 master是ActorSelection

3 LaunchExecutor

val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
  self, workerId, host, sparkHome, executorDir, akkaUrl, conf, ExecutorState.LOADING)
manager.start()
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)

LaunchExecutor命令由master发出

start()即启动一个线程去拿appDesc里的命令去运行

ExecutorRunner启动ExecutorBackend

16/02/15 14:25:09 INFO Worker: Asked to launch executor app-20160215142509-0000/1 for wordCount
16/02/15 14:25:09 INFO ExecutorRunner: Launch command: "java" "-cp" "::/opt/Spark/spark-1.2.0/conf:/opt/Spark/spark-1.2.0/assembly/target/scala-2.10/spark-assembly-1.2.0-hadoop1.0.4.jar" "-XX:MaxPermSize=128m" "-Dspark.driver.port=59613" "-Xms512M" "-Xmx512M" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "akka.tcp://[email protected]:59613/user/CoarseGrainedScheduler" "1" "192.168.101.32" "1" "app-20160215142509-0000" "akka.tcp://[email protected]:33310/user/Worker"

results matching ""

    No results matching ""