AppendOnlyMap

package org.apache.spark.util.collection

这是一个二次探测(quadratic probing)的2次方hash表,这种开放地址的hash表删除的时候不能简单删除,因为空值以为着后面不需要查找了,所以这里是一个只增不删的hash table

reduceByKey

RDD的reduceByKey通过此HashMap合并key和value

reduceByKey

hash

  private def nextPowerOf2(n: Int): Int = {
    val highBit = Integer.highestOneBit(n)
    if (highBit == n) n else highBit << 1
  }

Integer.highestOneBit(n)的给出某个数x的最高位数,例如3(11)则是2(10),意思是给出最近的2^n数,负数就始终是-x80000000就是最高位为1的数 那么这个函数的意思是如果这个是是2^n数就是返回这个数,如果不是则是2^(n+1)

对于Java的Integer.highestOneBit方法

    public static int highestOneBit(int i) {
        // HD, Figure 3-1
        i |= (i >>  1);  //得到数的最高2位为1
        i |= (i >>  2);
        i |= (i >>  4);
        i |= (i >>  8);
        i |= (i >> 16);  //得到从最高位开始全部为1
        return i - (i >>> 1);  //减去不带符号向右1位
    }

容量是初始值的下一个2^n,缺省64

private var capacity = nextPowerOf2(initialCapacity)

mask是容量少1位(除最高位全部为1)

private var mask = capacity - 1

hash算法

   private def rehash(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt()

MurmurHash 非加密的hash,意思不会难以reverse,murmurhash3产生32位或128位hash值

得到地址是容量少1位

var pos = rehash(k.hashCode) & mask

查找

   private var data = new Array[AnyRef](2 * capacity)

数组里成对分别放key和AnyRef

  def apply(key: K): V = {
    assert(!destroyed, destructionMessage)
    val k = key.asInstanceOf[AnyRef]
    if (k.eq(null)) {
      return nullValue
    }
    var pos = rehash(k.hashCode) & mask
    var i = 1
    while (true) {
      val curKey = data(2 * pos)
      if (k.eq(curKey) || k.equals(curKey)) {
        return data(2 * pos + 1).asInstanceOf[V]
      } else if (curKey.eq(null)) { //如果数据里key是空的,则表示没有值,返回空
        return null.asInstanceOf[V]
      } else {  //如果已有的key值和查找的不同表示冲突了,则顺序查找下一个
        val delta = i
        pos = (pos + delta) & mask  //二次方探查在哪里???
        i += 1
      }
    }
    null.asInstanceOf[V]
  }

会不会冲突的都已经满了的情况?

不会,因为在update方法里每次更新值的时候会incrementSize(),当达到growThreshold值的时候就把表double size

test

AppendOnlyMapSuite.scala

class AppendOnlyMapSuite extends FunSuite {
  test("initialization") {
    val goodMap1 = new AppendOnlyMap[Int, Int](1)
    assert(goodMap1.size === 0)
    val goodMap2 = new AppendOnlyMap[Int, Int](255)
    assert(goodMap2.size === 0)
    val goodMap3 = new AppendOnlyMap[Int, Int](256)
    assert(goodMap3.size === 0)
    intercept[IllegalArgumentException] {
      new AppendOnlyMap[Int, Int](1 << 30) // Invalid map size: bigger than 2^29
    }
    intercept[IllegalArgumentException] {
      new AppendOnlyMap[Int, Int](-1)
    }
    intercept[IllegalArgumentException] {
      new AppendOnlyMap[Int, Int](0)
    }
  }

  test("null values") {
    val map = new AppendOnlyMap[String, String]()
    for (i <- 1 to 100) {
      map("" + i) = null
    }
    assert(map.size === 100)
    assert(map("1") === null)
    assert(map(null) === null)
    assert(map.size === 100)
    map(null) = null
    assert(map.size === 101)
    assert(map(null) === null)
  }

  ...

results matching ""

    No results matching ""