Shared Variables
Read-Only variable broadcast
Give every node a copy of a large input dataset in an efficient manner,不用每次都把变量带在任务里
SparkContext.scala
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
val callSite = getCallSite
logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
cleaner.foreach(_.registerBroadcastForCleanup(bc))
bc
}
HttpBroadcast
TorrentBroadcast
Accumulators
Accumulators are variables that are only “added” to through an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. An accumulator is created from an initial value v by calling SparkContext.accumulator(v). Tasks running on the cluster can then add to it using the add method or the += operator (in Scala and Python). However, they cannot read its value. Only the driver program can read the accumulator’s value, using its value method.
SparkContext.scala
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0
}
implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int) = 0
}
implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long) = t1 + t2
def zero(initialValue: Long) = 0L
}
implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
def addInPlace(t1: Float, t2: Float) = t1 + t2
def zero(initialValue: Float) = 0f
}
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
new Accumulator(initialValue, param)
对于代码
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
action函数是foreach,只有action函数才会发动job产生计算,转换成
def foreach(f: T => Unit) {
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(f))
}
对应的func是iterator=> iter.foreach(f) 经过一系列变化后变成
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
if (dagScheduler == null) {
throw new SparkException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
func变成 (ctx, iter)=>iterator.foreach(f)
在submitjob的时候,func又变成func2是(ctx, iter)=> _, 这里_指的是?
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties = null): JobWaiter[U] =
{
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
eventProcessActor ! JobSubmitted(
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
waiter
}
在dagScheduler里把这个func打到job里了
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
allowLocal: Boolean,
callSite: CallSite,
listener: JobListener,
properties: Properties = null)
{
var finalStage: Stage = null
finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
if (finalStage != null) {
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
...
在提交任务的时候把这个func广播出去
private def submitMissingTasks(stage: Stage, jobId: Int) {
var taskBinary: Broadcast[Array[Byte]] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
//序列化func
val taskBinaryBytes: Array[Byte] =
if (stage.isShuffleMap) {
closureSerializer.serialize((stage.rdd, stage.shuffleDep.get) : AnyRef).array()
} else {
closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : AnyRef).array()
}
//==广播==
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
}
val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {
} else {
val job = stage.resultOfJob.get
partitionsToCompute.map { id =>
val p: Int = job.partitions(id)
val part = stage.rdd.partitions(p)
val locs = getPreferredLocs(stage.rdd, p)
new ResultTask(stage.id, taskBinary, part, locs, id)
}
}
在执行任务的时候把这个func反序列化回来,这时候的func应该是: (ctx, iter)=>iter.foreach(f),即
(ctx, iter)=>iter.foreach(x=> accum+=x)
有一点就是能够正确的反序列化accum类型,这个需要思考
ResultTask.scala
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
}
反序列化是把值置为初始值,然后存在ThreadLocal里用来传回driver Accumulators.scala
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
value_ = zero
deserialized = true
Accumulators.register(this, false)
}