Try Some None
HadoopRDD.scala
override def getPreferredLocations(split: Partition): Seq[String] = {
val locs: Option[Seq[String]] = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
case Some(c) =>
try {
val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
Some(HadoopRDD.convertSplitLocationInfo(infos))
} catch {
case e: Exception =>
logDebug("Failed to use InputSplitWithLocations.", e)
None
}
case None => None
}
locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
}
??: 如果不写HadoopRDD.SPLIT... 则找不到SPLIT_INFO_REFLECTIONS
locs有可能是Some或者None, 则getOrElse,传进去一个函数,即去掉本地节点
private[spark] val SPLIT_INFO_REFLECTIONS: Option[SplitInfoReflections] = try {
Some(new SplitInfoReflections)
} catch {
case e: Exception =>
logDebug("SplitLocationInfo and other new Hadoop classes are " +
"unavailable. Using the older Hadoop location info code.", e)
None
}
private[spark] class SplitInfoReflections {
val inputSplitWithLocationInfo =
Class.forName("org.apache.hadoop.mapred.InputSplitWithLocationInfo")
val getLocationInfo = inputSplitWithLocationInfo.getMethod("getLocationInfo")
val newInputSplit = Class.forName("org.apache.hadoop.mapreduce.InputSplit")
val newGetLocationInfo = newInputSplit.getMethod("getLocationInfo")
val splitLocationInfo = Class.forName("org.apache.hadoop.mapred.SplitLocationInfo")
val isInMemory = splitLocationInfo.getMethod("isInMemory")
val getLocation = splitLocationInfo.getMethod("getLocation")
}