Closure & Serialization

#

val conf = new SparkConf().setAppName("wordCount")
    val sc = new SparkContext(conf)
    val accum= sc.accumulator(0, "My accum")
    // default slice 2
    sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

对于上面代码的closure部分会生成匿名类,这个匿名类在cluster内传递

hxm@hxm-desktop:/opt/Learning/Spark$ ll target/classes/hxm/spark/learning/
总用量 56
drwxrwxr-x 2 hxm hxm 4096  3月  3 13:56 ./
drwxrwxr-x 3 hxm hxm 4096  3月  3 13:56 ../
-rw-rw-r-- 1 hxm hxm 1313  3月  3 13:56 Accum$$anonfun$main$1.class
-rw-rw-r-- 1 hxm hxm  713  3月  3 13:56 Accum.class
-rw-rw-r-- 1 hxm hxm 2037  3月  3 13:56 Accum$.class

hxm@hxm-desktop:/opt/Learning/Spark$ javap target/classes/hxm/spark/learning/Accum\$\$anonfun\$main\$1.class 
Compiled from "Accum.scala"
public final class hxm.spark.learning.Accum$$anonfun$main$1 extends scala.runtime.AbstractFunction1$mcVI$sp implements scala.Serializable {
  public static final long serialVersionUID;
  public final void apply(int);
  public void apply$mcVI$sp(int);
  public final java.lang.Object apply(java.lang.Object);
  public hxm.spark.learning.Accum$$anonfun$main$1(org.apache.spark.Accumulator);
}

可以看到closure是可序列化的,同时包含了accum也就是Accumulator

class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
    extends Accumulable[T,T](initialValue, param, name) {

class Accumulable[R, T] ( @transient initialValue: R, param: AccumulableParam[R, T], val name: Option[String])
extends Serializable {

Accumulator也是可序列化的

对于每个任务在Executor上反序列化函数和变量

我们看堆栈

at org.apache.spark.Accumulable$$anonfun$readObject$1.apply$mcV$sp(Accumulators.scala:141)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985)
    at org.apache.spark.Accumulable.readObject(Accumulators.scala:135)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
    at
    ...
    org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)

对应ResultTask代码是

override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

    metrics = Some(context.taskMetrics)
    func(context, rdd.iterator(partition, context))
  }

也就是反序列化函数那一行,在反序列花函数的时候,对应的Accumulator代码是

  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
    in.defaultReadObject()
    value_ = zero
    deserialized = true
    Accumulators.register(this, false)
  }

也就是也就是重建accum对象,然后放在本地线程里

参见stackoverflow有关问题

另外根据文里提到的Spark Summit 2013的片子截图如下:

上面左边param应用到MyCoolRddApp的变量,然后而MyCoolRddApp序列化出错?

Closure Clean

Closure Searialization

这篇文章解释了Closure的序列化 Hygienic Closures for Scala Function Serialization 根据这篇文章的解释,当Closure引用了外部变量时,Scala会序列化整个类的实例,不管变量有没有用到 when a class member value shows up in a closure that is defined inside the class body, the entire instance, including any and all other member values, is included in the closure. Presumably this is because a class may have any number of instances, and the compiler is including the entire instance in the closure to properly resolve the correct member value

所以代码如果是这样就是个坑

class foo() extends Serializable {
  val v = 42    // easy to serialize
  val w = 4.5   // easy to serialize
  val data = (1 to 1000000000).toList  // serialization landmine hiding in your closure

  // The returned function includes all of 'foo' instance in its closure
  def f() = (x: Int) => v * x
}

那么就有了Spark里的closure clean 参见Quora上这这篇 What does Closure.cleaner (func) mean in Spark?

下面是如下测试代码的输出

// class ClosureCleanerSuite extends SparkFunSuite 
  test("closures inside an object") {
    assert(TestObject.run() === 30) // 6 + 7 + 8 + 9
  }

object TestObject {
  def run(): Int = {
    var nonSer = new NonSerializable
    val x = 5
    withSpark(new SparkContext("local", "test")) { sc =>
      val nums = sc.parallelize(Array(1, 2, 3, 4))
      nums.map(_ + x).reduce(_ + _)
    }
  }
}
16/11/07 09:47:15.009 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: +++ Cleaning closure <function1> (org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$37) +++
16/11/07 09:47:15.022 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + declared fields: 2
16/11/07 09:47:15.022 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      public static final long org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$37.serialVersionUID
16/11/07 09:47:15.023 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      private final org.apache.spark.util.TestObject$$anonfun$run$1 org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$37.$outer
16/11/07 09:47:15.023 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + declared methods: 3
16/11/07 09:47:15.023 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      public int org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$37.apply$mcII$sp(int)
16/11/07 09:47:15.023 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      public final int org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$37.apply(int)
16/11/07 09:47:15.023 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      public final java.lang.Object org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$37.apply(java.lang.Object)
16/11/07 09:47:15.024 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + inner classes: 0
16/11/07 09:47:15.024 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + outer classes: 1
16/11/07 09:47:15.025 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      org.apache.spark.util.TestObject$$anonfun$run$1
16/11/07 09:47:15.025 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + outer objects: 1
16/11/07 09:47:15.026 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      <function1>
16/11/07 09:47:15.027 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + populating accessed fields because this is the starting closure
16/11/07 09:47:15.030 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + fields accessed by starting closure: 1
16/11/07 09:47:15.031 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      (class org.apache.spark.util.TestObject$$anonfun$run$1,Set(x$43))
16/11/07 09:47:15.031 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + outermost object is a closure, so we just keep it: (class org.apache.spark.util.TestObject$$anonfun$run$1,<function1>)
16/11/07 09:47:15.032 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + cloning the object <function1> of class org.apache.spark.util.TestObject$$anonfun$run$1
16/11/07 09:47:15.034 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + cleaning cloned closure <function1> recursively (org.apache.spark.util.TestObject$$anonfun$run$1)
16/11/07 09:47:15.034 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: +++ Cleaning closure <function1> (org.apache.spark.util.TestObject$$anonfun$run$1) +++
16/11/07 09:47:15.036 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + declared fields: 2
16/11/07 09:47:15.036 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      public static final long org.apache.spark.util.TestObject$$anonfun$run$1.serialVersionUID
16/11/07 09:47:15.036 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      public final int org.apache.spark.util.TestObject$$anonfun$run$1.x$43
16/11/07 09:47:15.036 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + declared methods: 2
16/11/07 09:47:15.036 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      public final java.lang.Object org.apache.spark.util.TestObject$$anonfun$run$1.apply(java.lang.Object)
16/11/07 09:47:15.036 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      public final int org.apache.spark.util.TestObject$$anonfun$run$1.apply(org.apache.spark.SparkContext)
16/11/07 09:47:15.036 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + inner classes: 2
16/11/07 09:47:15.037 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$38
16/11/07 09:47:15.037 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$37
16/11/07 09:47:15.037 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + outer classes: 0
16/11/07 09:47:15.037 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + outer objects: 0
16/11/07 09:47:15.037 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + fields accessed by starting closure: 1
16/11/07 09:47:15.038 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      (class org.apache.spark.util.TestObject$$anonfun$run$1,Set(x$43))
16/11/07 09:47:15.038 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + there are no enclosing objects!
16/11/07 09:47:15.038 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  +++ closure <function1> (org.apache.spark.util.TestObject$$anonfun$run$1) is now cleaned +++
16/11/07 09:47:15.038 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  +++ closure <function1> (org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$37) is now cleaned +++
16/11/07 09:47:15.047 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: +++ Cleaning closure <function2> (org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$38) +++
16/11/07 09:47:15.047 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + declared fields: 1
16/11/07 09:47:15.047 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      public static final long org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$38.serialVersionUID
16/11/07 09:47:15.048 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + declared methods: 3
16/11/07 09:47:15.048 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      public int org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$38.apply$mcIII$sp(int,int)
16/11/07 09:47:15.048 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      public final int org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$38.apply(int,int)
16/11/07 09:47:15.048 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      public final java.lang.Object org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$38.apply(java.lang.Object,java.lang.Object)
16/11/07 09:47:15.048 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + inner classes: 0
16/11/07 09:47:15.048 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + outer classes: 0
16/11/07 09:47:15.048 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + outer objects: 0
16/11/07 09:47:15.048 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + populating accessed fields because this is the starting closure
16/11/07 09:47:15.049 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + fields accessed by starting closure: 0
16/11/07 09:47:15.049 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + there are no enclosing objects!
16/11/07 09:47:15.049 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  +++ closure <function2> (org.apache.spark.util.TestObject$$anonfun$run$1$$anonfun$apply$38) is now cleaned +++
16/11/07 09:47:15.058 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner: +++ Cleaning closure <function2> (org.apache.spark.SparkContext$$anonfun$36) +++
16/11/07 09:47:15.059 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + declared fields: 2
16/11/07 09:47:15.059 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      public static final long org.apache.spark.SparkContext$$anonfun$36.serialVersionUID
16/11/07 09:47:15.059 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      private final scala.Function1 org.apache.spark.SparkContext$$anonfun$36.processPartition$1
16/11/07 09:47:15.059 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + declared methods: 2
16/11/07 09:47:15.059 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      public final java.lang.Object org.apache.spark.SparkContext$$anonfun$36.apply(java.lang.Object,java.lang.Object)
16/11/07 09:47:15.059 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:      public final java.lang.Object org.apache.spark.SparkContext$$anonfun$36.apply(org.apache.spark.TaskContext,scala.collection.Iterator)
16/11/07 09:47:15.059 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + inner classes: 0
16/11/07 09:47:15.060 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + outer classes: 0
16/11/07 09:47:15.060 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + outer objects: 0
16/11/07 09:47:15.060 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + populating accessed fields because this is the starting closure
16/11/07 09:47:15.060 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + fields accessed by starting closure: 0
16/11/07 09:47:15.060 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  + there are no enclosing objects!
16/11/07 09:47:15.060 ScalaTest-main-running-ClosureCleanerSuite DEBUG ClosureCleaner:  +++ closure <function2> (org.apache.spark.SparkContext$$anonfun$36) is now cleaned +++

results matching ""

    No results matching ""