AppendOnlyMap
package org.apache.spark.util.collection
这是一个二次探测(quadratic probing)的2次方hash表,这种开放地址的hash表删除的时候不能简单删除,因为空值以为着后面不需要查找了,所以这里是一个只增不删的hash table
reduceByKey
RDD的reduceByKey通过此HashMap合并key和value
hash
private def nextPowerOf2(n: Int): Int = {
val highBit = Integer.highestOneBit(n)
if (highBit == n) n else highBit << 1
}
Integer.highestOneBit(n)
的给出某个数x的最高位数,例如3(11)则是2(10),意思是给出最近的2^n数,负数就始终是-x80000000就是最高位为1的数
那么这个函数的意思是如果这个是是2^n数就是返回这个数,如果不是则是2^(n+1)
对于Java的Integer.highestOneBit
方法
public static int highestOneBit(int i) {
// HD, Figure 3-1
i |= (i >> 1); //得到数的最高2位为1
i |= (i >> 2);
i |= (i >> 4);
i |= (i >> 8);
i |= (i >> 16); //得到从最高位开始全部为1
return i - (i >>> 1); //减去不带符号向右1位
}
容量是初始值的下一个2^n,缺省64
private var capacity = nextPowerOf2(initialCapacity)
mask是容量少1位(除最高位全部为1)
private var mask = capacity - 1
hash算法
private def rehash(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt()
MurmurHash 非加密的hash,意思不会难以reverse,murmurhash3产生32位或128位hash值
得到地址是容量少1位
var pos = rehash(k.hashCode) & mask
查找
private var data = new Array[AnyRef](2 * capacity)
数组里成对分别放key和AnyRef
def apply(key: K): V = {
assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
return nullValue
}
var pos = rehash(k.hashCode) & mask
var i = 1
while (true) {
val curKey = data(2 * pos)
if (k.eq(curKey) || k.equals(curKey)) {
return data(2 * pos + 1).asInstanceOf[V]
} else if (curKey.eq(null)) { //如果数据里key是空的,则表示没有值,返回空
return null.asInstanceOf[V]
} else { //如果已有的key值和查找的不同表示冲突了,则顺序查找下一个
val delta = i
pos = (pos + delta) & mask //二次方探查在哪里???
i += 1
}
}
null.asInstanceOf[V]
}
会不会冲突的都已经满了的情况?
不会,因为在update方法里每次更新值的时候会incrementSize()
,当达到growThreshold值的时候就把表double size
test
AppendOnlyMapSuite.scala
class AppendOnlyMapSuite extends FunSuite {
test("initialization") {
val goodMap1 = new AppendOnlyMap[Int, Int](1)
assert(goodMap1.size === 0)
val goodMap2 = new AppendOnlyMap[Int, Int](255)
assert(goodMap2.size === 0)
val goodMap3 = new AppendOnlyMap[Int, Int](256)
assert(goodMap3.size === 0)
intercept[IllegalArgumentException] {
new AppendOnlyMap[Int, Int](1 << 30) // Invalid map size: bigger than 2^29
}
intercept[IllegalArgumentException] {
new AppendOnlyMap[Int, Int](-1)
}
intercept[IllegalArgumentException] {
new AppendOnlyMap[Int, Int](0)
}
}
test("null values") {
val map = new AppendOnlyMap[String, String]()
for (i <- 1 to 100) {
map("" + i) = null
}
assert(map.size === 100)
assert(map("1") === null)
assert(map(null) === null)
assert(map.size === 100)
map(null) = null
assert(map.size === 101)
assert(map(null) === null)
}
...