tools
compile
hxm@hxm-desktop:/opt/Spark/spark$ export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
hxm@hxm-desktop:/opt/Spark/spark/streaming$ ../build/mvn -Dtest=none -Dsuites=org.apache.spark.streaming.scheduler.rate.RateEstimator2Suite test
test
SparkSubmitSuite.scala
class SparkSubmitSuite extends FunSuite with Matchers {
test("SPARK_CONF_DIR overrides spark-defaults.conf") {
forConfDir(Map("spark.executor.memory" -> "2.3g")) { path =>
...
assert(appArgs.propertiesFile != null)
assert(appArgs.propertiesFile.startsWith(path))
appArgs.executorMemory should be ("2.3g")
}
}
test("handles arguments with --key=val") {
val clArgs = Seq(
"--jars=one.jar,two.jar,three.jar",
"--name=myApp")
val appArgs = new SparkSubmitArguments(clArgs)
appArgs.jars should include regex (".*one.jar,.*two.jar,.*three.jar")
appArgs.name should be ("myApp")
}
...
}
test方法原型定义如下 FunSuiteLike.scala
protected def test(testName: String, testTags: Tag*)(testFun: => Unit) {
engine.registerTest(testName, Transformer(testFun _), "testCannotAppearInsideAnotherTest", "FunSuite.scala", "test", 4, -2, None, None, None, testTags: _*)
}
所以上面test("xxx")实际是调用test方法用name参数以及后面{...}就是testFun参数了
shouldbe
在org.scalatest.Machters
里定义如下
sealed class AnyShouldWrapper[T](val leftSideValue: T) {
def shouldBe(right: Any) {
if (!areEqualComparingArraysStructurally(leftSideValue, right)) {
val (leftee, rightee) = Suite.getObjectsForFailureMessage(leftSideValue, right)
throw newTestFailedException(FailureMessages("wasNotEqualTo", leftee, rightee))
}
}
用法
se.compute(Milliseconds(100), 100) shouldBe None
原因是在
trait Matchers extends Assertions with Tolerance with ShouldVerb with MatcherWords with Explicitly { matchers =>
import scala.language.implicitConversions
implicit def convertToAnyShouldWrapper[T](o: T): AnyShouldWrapper[T] = new AnyShouldWrapper(o)
}
object Matchers extends Matchers
ScalaTest matchers provides five different ways to check equality, each designed to address a different need. They are:
result should equal (3) // can customize equality
result should === (3) // can customize equality and enforce type constraints
result should be (3) // cannot customize equality, so fastest to compile
result shouldEqual 3 // can customize equality, no parentheses required
result shouldBe 3 // cannot customize equality, so fastest to compile, no parentheses required
include
和be
定义在MacherWords.scala
val include = new IncludeWord
val be = new BeWord
intercept
RDDSuite.scala
test("caching with failures") {
val onlySplit = new Partition { override def index: Int = 0 }
var shouldFail = true
val rdd = new RDD[Int](sc, Nil) {
override def getPartitions: Array[Partition] = Array(onlySplit)
override val getDependencies = List[Dependency[_]]()
override def compute(split: Partition, context: TaskContext): Iterator[Int] = {
throw new Exception("injected failure")
}
}.cache()
val thrown = intercept[Exception]{
rdd.collect()
}
assert(thrown.getMessage.contains("injected failure"))
}
Assertions.scala
def intercept[T <: AnyRef](f: => Any)(implicit manifest: Manifest[T]): T = {
[?]manifest
def arr[T] = new Array[T](0) // does not compile
def arr[T](implicit m: Manifest[T]) = new Array[T](0) // compiles
def arr[T: Manifest] = new Array[T](0) // shorthand for the preceding
class DoubleRDDSuite extends FunSuite with SharedSparkContext {
trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
[ScalaCheck]
Eventually
周期性调用by-name参数直到成功,除非定义了timeout
//RateControllerSuite
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
test("RateController - rate controller publishes updates after batches complete") {
val ssc = new StreamingContext(conf, batchDuration)
withStreamingContext(ssc) { ssc =>
val dstream = new RateTestInputDStream(ssc)
dstream.register()
ssc.start()
//用了多个implicits
eventually(timeout(10.seconds)) {
assert(dstream.publishedRates > 0)
}
}
}
//eventually定义
def eventually[T](timeout: Timeout)(fun: => T)(implicit config: PatienceConfig): T =
//trait PatienceConfiguration
private val defaultPatienceConfig = PatienceConfig()
implicit def patienceConfig = defaultPatienceConfig
def timeout(value: Span) = Timeout(value)
//SpanSugar
class GrainOfTime(value: Long) {
def seconds: Span = Span(value, Seconds)
}
implicit def convertLongToGrainOfTime(i: Long) = new GrainOfTime(i)
def seconds: Span = Span(value, Seconds)
object SpanSugar extends SpanSugar
所以10.seconds
就是拓展成了new GrainOfTime(10L).seconds()
timeout(...)
则是拓展成了defaultPatienceConfig.timeout()
after
用来在每一个test执行后进行清理工作
//StreamingContextSuite
after {
if (ssc != null) {
ssc.stop()
ssc = null
}
if (sc != null) {
sc.stop()
sc = null
}
}
//定义在org.scalatest.BeforeAndAfter
protected def after(fun: => Any) {
}