scope

SPARK-6934

另外参见 Understanding your Apache Spark application through visualization

用来表示创建RDD的代码块,在UI里可以可视化的现实RDDs之间的关系,比如在一个stage里显示pipeline在一起的RDDs 定义

private[spark] class RDDOperationScope(
    val name: String,
    val parent: Option[RDDOperationScope] = None,
    val id: String = RDDOperationScope.nextScopeId().toString) {

例如
两个蓝色的框分别是不同的scope,右上角是scope的名字 产生上个图的应用代码:

//PCapFileWriter.receiverStream() line:164
val flowStream = ssc.receiverStream(flowReceiver)
//line:166
flowStream.saveAsObjectFiles(SavePath)`

StreamingContext里定义了scope "receiver stream"

def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
    withNamedScope("receiver stream") {
      new PluggableInputDStream[T](this, receiver)
    }
  }

DStream.saveAsObjectFiles()定义了scope 并且在RDD.saveAsObjectFile里也定义了scope

//DStream
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
    val saveFunc = (rdd: RDD[T], time: Time) => {
      val file = rddToFileName(prefix, suffix, time)
      rdd.saveAsObjectFile(file)
    }
    this.foreachRDD(saveFunc, displayInnerRDDOps = false)
  }

//RDD
  def saveAsObjectFile(path: String): Unit = withScope {
      this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
        .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
        .saveAsSequenceFile(path)
    }

RDDOperationScope 找到调用"withScope"方法的方法名作为scope名字

private[spark] def withScope[T](
      sc: SparkContext,
      allowNesting: Boolean = false)(body: => T): T = {
    val ourMethodName = "withScope"
    val callerMethodName = Thread.currentThread.getStackTrace()
      .dropWhile(_.getMethodName != ourMethodName)
      .find(_.getMethodName != ourMethodName)
      .map(_.getMethodName)
      .getOrElse {
        // Log a warning just in case, but this should almost certainly never happen
        logWarning("No valid method name for this RDD operation scope!")
        "N/A"
      }
    withScope[T](sc, callerMethodName, allowNesting, ignoreParent = false)(body)
  }

basically streaming里每一个产生新的DStream的地方都会调用withScope

RDD中关联scope对象,从前面填进去的环境中获取 [scope在实例化的时候初始化?]

//RDD
@transient private[spark] val scope: Option[RDDOperationScope] = {
    Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(RDDOperationScope.fromJson)
  }

CallSite则对应了创建此RDD的代码,应该是蓝框内现实的代码行数

// RDD
@transient private[spark] val creationSite = sc.getCallSite()

results matching ""

    No results matching ""