tools

compile

hxm@hxm-desktop:/opt/Spark/spark$ export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"

hxm@hxm-desktop:/opt/Spark/spark/streaming$ ../build/mvn -Dtest=none -Dsuites=org.apache.spark.streaming.scheduler.rate.RateEstimator2Suite test

test

SparkSubmitSuite.scala

class SparkSubmitSuite extends FunSuite with Matchers {
  test("SPARK_CONF_DIR overrides spark-defaults.conf") {
    forConfDir(Map("spark.executor.memory" -> "2.3g")) { path =>
        ...
      assert(appArgs.propertiesFile != null)
      assert(appArgs.propertiesFile.startsWith(path))
      appArgs.executorMemory should  be ("2.3g")
    }
  }

  test("handles arguments with --key=val") {
    val clArgs = Seq(
      "--jars=one.jar,two.jar,three.jar",
      "--name=myApp")
    val appArgs = new SparkSubmitArguments(clArgs)
    appArgs.jars should include regex (".*one.jar,.*two.jar,.*three.jar")
    appArgs.name should be ("myApp")
  }

 ...
}

test方法原型定义如下 FunSuiteLike.scala

protected def test(testName: String, testTags: Tag*)(testFun: => Unit) {
    engine.registerTest(testName, Transformer(testFun _), "testCannotAppearInsideAnotherTest", "FunSuite.scala", "test", 4, -2, None, None, None, testTags: _*)
  }

所以上面test("xxx")实际是调用test方法用name参数以及后面{...}就是testFun参数了

shouldbeorg.scalatest.Machters里定义如下

sealed class AnyShouldWrapper[T](val leftSideValue: T) {
def shouldBe(right: Any) {
      if (!areEqualComparingArraysStructurally(leftSideValue, right)) {
        val (leftee, rightee) = Suite.getObjectsForFailureMessage(leftSideValue, right)
        throw newTestFailedException(FailureMessages("wasNotEqualTo", leftee, rightee))
      }
    }

用法

se.compute(Milliseconds(100), 100) shouldBe None

原因是在

trait Matchers extends Assertions with Tolerance with ShouldVerb with MatcherWords with Explicitly { matchers =>
import scala.language.implicitConversions

implicit def convertToAnyShouldWrapper[T](o: T): AnyShouldWrapper[T] = new AnyShouldWrapper(o)
}

object Matchers extends Matchers

ScalaTest matchers provides five different ways to check equality, each designed to address a different need. They are:

   result should equal (3) // can customize equality
   result should === (3)   // can customize equality and enforce type constraints
   result should be (3)    // cannot customize equality, so fastest to compile
   result shouldEqual 3    // can customize equality, no parentheses required
   result shouldBe 3       // cannot customize equality, so fastest to compile, no parentheses required

includebe定义在MacherWords.scala

val include = new IncludeWord
val be = new BeWord

intercept

RDDSuite.scala

 test("caching with failures") {
    val onlySplit = new Partition { override def index: Int = 0 }
    var shouldFail = true
    val rdd = new RDD[Int](sc, Nil) {
      override def getPartitions: Array[Partition] = Array(onlySplit)
      override val getDependencies = List[Dependency[_]]()
      override def compute(split: Partition, context: TaskContext): Iterator[Int] = {
        throw new Exception("injected failure")
      }
    }.cache()
    val thrown = intercept[Exception]{
      rdd.collect()
    }
    assert(thrown.getMessage.contains("injected failure"))
  }

Assertions.scala

def intercept[T <: AnyRef](f: => Any)(implicit manifest: Manifest[T]): T = {

[?]manifest

 def arr[T] = new Array[T](0)                          // does not compile
  def arr[T](implicit m: Manifest[T]) = new Array[T](0) // compiles
  def arr[T: Manifest] = new Array[T](0)                // shorthand for the preceding
class DoubleRDDSuite extends FunSuite with SharedSparkContext {

trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>

[ScalaCheck]

Eventually 周期性调用by-name参数直到成功,除非定义了timeout

//RateControllerSuite
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
test("RateController - rate controller publishes updates after batches complete") {
    val ssc = new StreamingContext(conf, batchDuration)
    withStreamingContext(ssc) { ssc =>
      val dstream = new RateTestInputDStream(ssc)
      dstream.register()
      ssc.start()

      //用了多个implicits
      eventually(timeout(10.seconds)) {
        assert(dstream.publishedRates > 0)
      }
    }
  }

//eventually定义
def eventually[T](timeout: Timeout)(fun: => T)(implicit config: PatienceConfig): T =

//trait PatienceConfiguration
private val defaultPatienceConfig = PatienceConfig()
implicit def patienceConfig = defaultPatienceConfig
def timeout(value: Span) = Timeout(value)

//SpanSugar
class GrainOfTime(value: Long) {
    def seconds: Span = Span(value, Seconds)
}
implicit def convertLongToGrainOfTime(i: Long) = new GrainOfTime(i)
def seconds: Span = Span(value, Seconds)
object SpanSugar extends SpanSugar

所以10.seconds就是拓展成了new GrainOfTime(10L).seconds() timeout(...)则是拓展成了defaultPatienceConfig.timeout()

after

用来在每一个test执行后进行清理工作

//StreamingContextSuite
after {
    if (ssc != null) {
      ssc.stop()
      ssc = null
    }
    if (sc != null) {
      sc.stop()
      sc = null
    }
  }

  //定义在org.scalatest.BeforeAndAfter
  protected def after(fun: => Any) {
  }

results matching ""

    No results matching ""