scope
另外参见 Understanding your Apache Spark application through visualization
用来表示创建RDD的代码块,在UI里可以可视化的现实RDDs之间的关系,比如在一个stage里显示pipeline在一起的RDDs 定义
private[spark] class RDDOperationScope(
val name: String,
val parent: Option[RDDOperationScope] = None,
val id: String = RDDOperationScope.nextScopeId().toString) {
例如
两个蓝色的框分别是不同的scope,右上角是scope的名字
产生上个图的应用代码:
//PCapFileWriter.receiverStream() line:164
val flowStream = ssc.receiverStream(flowReceiver)
//line:166
flowStream.saveAsObjectFiles(SavePath)`
在StreamingContext
里定义了scope "receiver stream"
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
withNamedScope("receiver stream") {
new PluggableInputDStream[T](this, receiver)
}
}
DStream.saveAsObjectFiles()
定义了scope
并且在RDD.saveAsObjectFile
里也定义了scope
//DStream
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsObjectFile(file)
}
this.foreachRDD(saveFunc, displayInnerRDDOps = false)
}
//RDD
def saveAsObjectFile(path: String): Unit = withScope {
this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
.saveAsSequenceFile(path)
}
RDDOperationScope
找到调用"withScope"方法的方法名作为scope名字
private[spark] def withScope[T](
sc: SparkContext,
allowNesting: Boolean = false)(body: => T): T = {
val ourMethodName = "withScope"
val callerMethodName = Thread.currentThread.getStackTrace()
.dropWhile(_.getMethodName != ourMethodName)
.find(_.getMethodName != ourMethodName)
.map(_.getMethodName)
.getOrElse {
// Log a warning just in case, but this should almost certainly never happen
logWarning("No valid method name for this RDD operation scope!")
"N/A"
}
withScope[T](sc, callerMethodName, allowNesting, ignoreParent = false)(body)
}
basically streaming里每一个产生新的DStream的地方都会调用withScope
RDD中关联scope对象,从前面填进去的环境中获取 [scope在实例化的时候初始化?]
//RDD
@transient private[spark] val scope: Option[RDDOperationScope] = {
Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(RDDOperationScope.fromJson)
}
CallSite
则对应了创建此RDD的代码,应该是蓝框内现实的代码行数
// RDD
@transient private[spark] val creationSite = sc.getCallSite()