Closure & Serialization
#
val conf = new SparkConf().setAppName("wordCount")
val sc = new SparkContext(conf)
val accum= sc.accumulator(0, "My accum")
// default slice 2
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
对于上面代码的closure部分会生成匿名类,这个匿名类在cluster内传递
hxm@hxm-desktop:/opt/Learning/Spark$ ll target/classes/hxm/spark/learning/
总用量 56
drwxrwxr-x 2 hxm hxm 4096 3月 3 13:56 ./
drwxrwxr-x 3 hxm hxm 4096 3月 3 13:56 ../
-rw-rw-r-- 1 hxm hxm 1313 3月 3 13:56 Accum$$anonfun$main$1.class
-rw-rw-r-- 1 hxm hxm 713 3月 3 13:56 Accum.class
-rw-rw-r-- 1 hxm hxm 2037 3月 3 13:56 Accum$.class
hxm@hxm-desktop:/opt/Learning/Spark$ javap target/classes/hxm/spark/learning/Accum\$\$anonfun\$main\$1.class
Compiled from "Accum.scala"
public final class hxm.spark.learning.Accum$$anonfun$main$1 extends scala.runtime.AbstractFunction1$mcVI$sp implements scala.Serializable {
public static final long serialVersionUID;
public final void apply(int);
public void apply$mcVI$sp(int);
public final java.lang.Object apply(java.lang.Object);
public hxm.spark.learning.Accum$$anonfun$main$1(org.apache.spark.Accumulator);
}
可以看到closure是可序列化的,同时包含了accum也就是Accumulator
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
extends Accumulable[T,T](initialValue, param, name) {
class Accumulable[R, T] ( @transient initialValue: R, param: AccumulableParam[R, T], val name: Option[String])
extends Serializable {
Accumulator也是可序列化的
对于每个任务在Executor上反序列化函数和变量
我们看堆栈
at org.apache.spark.Accumulable$$anonfun$readObject$1.apply$mcV$sp(Accumulators.scala:141)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985)
at org.apache.spark.Accumulable.readObject(Accumulators.scala:135)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
at
...
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
对应ResultTask代码是
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
}
也就是反序列化函数那一行,在反序列花函数的时候,对应的Accumulator代码是
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
value_ = zero
deserialized = true
Accumulators.register(this, false)
}
也就是也就是重建accum对象,然后放在本地线程里
另外根据文里提到的Spark Summit 2013的片子截图如下:
上面左边param应用到MyCoolRddApp的变量,然后而MyCoolRddApp序列化出错?
Closure Clean
Closure Searialization
这篇文章解释了Closure的序列化
Hygienic Closures for Scala Function Serialization
根据这篇文章的解释,当Closure引用了外部变量时,Scala会序列化整个类的实例,不管变量有没有用到
when a class member value shows up in a closure that is defined inside the class body,
the entire instance, including any and all other member values, is included in the closure.
Presumably this is because a class may have any number of instances,
and the compiler is including the entire instance in the closure to properly resolve the correct member value
所以代码如果是这样就是个坑
class foo() extends Serializable {
val v = 42 // easy to serialize
val w = 4.5 // easy to serialize
val data = (1 to 1000000000).toList // serialization landmine hiding in your closure
// The returned function includes all of 'foo' instance in its closure
def f() = (x: Int) => v * x
}
那么就有了Spark里的closure clean 参见Quora上这这篇 What does Closure.cleaner (func) mean in Spark?
下面是如下测试代码的输出
// class ClosureCleanerSuite extends SparkFunSuite
test("closures inside an object") {
assert(TestObject.run() === 30) // 6 + 7 + 8 + 9
}
object TestObject {
def run(): Int = {
var nonSer = new NonSerializable
val x = 5
withSpark(new SparkContext("local", "test")) { sc =>
val nums = sc.parallelize(Array(1, 2, 3, 4))
nums.map(_ + x).reduce(_ + _)
}
}
}
16/11/07 09:47:15.009 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: +++ Cleaning closure <function1> (org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$37) +++
16/11/07 09:47:15.022 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + declared fields: 2
16/11/07 09:47:15.022 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: public static final long org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$37.serialVersionUID
16/11/07 09:47:15.023 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: private final org.apache.spark.util.TestObject$$anonfun$run$1 org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$37.$outer
16/11/07 09:47:15.023 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + declared methods: 3
16/11/07 09:47:15.023 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: public int org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$37.apply$mcII$sp(int)
16/11/07 09:47:15.023 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: public final int org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$37.apply(int)
16/11/07 09:47:15.023 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: public final java.lang.Object org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$37.apply(java.lang.Object)
16/11/07 09:47:15.024 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + inner classes: 0
16/11/07 09:47:15.024 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + outer classes: 1
16/11/07 09:47:15.025 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: org.apache.spark.util.TestObject$$anonfun$run$1
16/11/07 09:47:15.025 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + outer objects: 1
16/11/07 09:47:15.026 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: <function1>
16/11/07 09:47:15.027 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + populating accessed fields because this is the starting closure
16/11/07 09:47:15.030 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + fields accessed by starting closure: 1
16/11/07 09:47:15.031 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: (class org.apache.spark.util.TestObject$$anonfun$run$1,Set(x$43))
16/11/07 09:47:15.031 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + outermost object is a closure, so we just keep it: (class org.apache.spark.util.TestObject$$anonfun$run$1,<function1>)
16/11/07 09:47:15.032 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + cloning the object <function1> of class org.apache.spark.util.TestObject$$anonfun$run$1
16/11/07 09:47:15.034 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + cleaning cloned closure <function1> recursively (org.apache.spark.util.TestObject$$anonfun$run$1)
16/11/07 09:47:15.034 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: +++ Cleaning closure <function1> (org.apache.spark.util.TestObject$$anonfun$run$1) +++
16/11/07 09:47:15.036 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + declared fields: 2
16/11/07 09:47:15.036 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: public static final long org.apache.spark.util.TestObject$$anonfun$run$1.serialVersionUID
16/11/07 09:47:15.036 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: public final int org.apache.spark.util.TestObject$$anonfun$run$1.x$43
16/11/07 09:47:15.036 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + declared methods: 2
16/11/07 09:47:15.036 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: public final java.lang.Object org.apache.spark.util.TestObject$$anonfun$run$1.apply(java.lang.Object)
16/11/07 09:47:15.036 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: public final int org.apache.spark.util.TestObject$$anonfun$run$1.apply(org.apache.spark.SparkContext)
16/11/07 09:47:15.036 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + inner classes: 2
16/11/07 09:47:15.037 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$38
16/11/07 09:47:15.037 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$37
16/11/07 09:47:15.037 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + outer classes: 0
16/11/07 09:47:15.037 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + outer objects: 0
16/11/07 09:47:15.037 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + fields accessed by starting closure: 1
16/11/07 09:47:15.038 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: (class org.apache.spark.util.TestObject$$anonfun$run$1,Set(x$43))
16/11/07 09:47:15.038 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + there are no enclosing objects!
16/11/07 09:47:15.038 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: +++ closure <function1> (org.apache.spark.util.TestObject$$anonfun$run$1) is now cleaned +++
16/11/07 09:47:15.038 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: +++ closure <function1> (org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$37) is now cleaned +++
16/11/07 09:47:15.047 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: +++ Cleaning closure <function2> (org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$38) +++
16/11/07 09:47:15.047 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + declared fields: 1
16/11/07 09:47:15.047 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: public static final long org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$38.serialVersionUID
16/11/07 09:47:15.048 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + declared methods: 3
16/11/07 09:47:15.048 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: public int org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$38.apply$mcIII$sp(int,int)
16/11/07 09:47:15.048 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: public final int org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$38.apply(int,int)
16/11/07 09:47:15.048 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: public final java.lang.Object org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$38.apply(java.lang.Object,java.lang.Object)
16/11/07 09:47:15.048 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + inner classes: 0
16/11/07 09:47:15.048 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + outer classes: 0
16/11/07 09:47:15.048 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + outer objects: 0
16/11/07 09:47:15.048 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + populating accessed fields because this is the starting closure
16/11/07 09:47:15.049 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + fields accessed by starting closure: 0
16/11/07 09:47:15.049 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + there are no enclosing objects!
16/11/07 09:47:15.049 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: +++ closure <function2> (org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$38) is now cleaned +++
16/11/07 09:47:15.058 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: +++ Cleaning closure <function2> (org.apache.spark.SparkContext$$anonfun$36) +++
16/11/07 09:47:15.059 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + declared fields: 2
16/11/07 09:47:15.059 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: public static final long org.apache.spark.SparkContext$$anonfun$36.serialVersionUID
16/11/07 09:47:15.059 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: private final scala.Function1 org.apache.spark.SparkContext$$anonfun$36.processPartition$1
16/11/07 09:47:15.059 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + declared methods: 2
16/11/07 09:47:15.059 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: public final java.lang.Object org.apache.spark.SparkContext$$anonfun$36.apply(java.lang.Object,java.lang.Object)
16/11/07 09:47:15.059 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: public final java.lang.Object org.apache.spark.SparkContext$$anonfun$36.apply(org.apache.spark.TaskContext,scala.collection.Iterator)
16/11/07 09:47:15.059 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + inner classes: 0
16/11/07 09:47:15.060 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + outer classes: 0
16/11/07 09:47:15.060 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + outer objects: 0
16/11/07 09:47:15.060 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + populating accessed fields because this is the starting closure
16/11/07 09:47:15.060 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + fields accessed by starting closure: 0
16/11/07 09:47:15.060 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: + there are no enclosing objects!
16/11/07 09:47:15.060 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: +++ closure <function2> (org.apache.spark.SparkContext$$anonfun$36) is now cleaned +++