Partition
trait Partition extends Serializable {
def index: Int
override def hashCode(): Int = index
}
RDD.scala
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getPartitions: Array[Partition]
对于Hadoop来说,Partion是和InputSplit相关联的 HadoopRDD.scala
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
val inputFormat = getInputFormat(jobConf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(jobConf)
}
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
}
InputFormat是hadoop接口比如FileInputFormat
InputFormat.java
/**
* Logically split the set of input files for the job.
*
* <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
* for processing.</p>
*
* <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
* input files are not physically split into chunks. For e.g. a split could
* be <i><input-file-path, start, offset></i> tuple.
*
* @param job job configuration.
* @param numSplits the desired number of splits, a hint.
* @return an array of {@link InputSplit}s for the job.
*/
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
通过inputSplit构造HadoopPartion