Future
Future Doc Failed futures store an instance of Throwable instead of the result value.
//ReceivedBlockHandler
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
//放到blockManager里
val storeInBlockManagerFuture = Future {
val putResult =
blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true)
if (!putResult.map { _._1 }.contains(blockId)) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
}
}
//写到wal里
val storeInWriteAheadLogFuture = Future {
writeAheadLog.write(serializedBlock, clock.getTimeMillis())
}
// Combine the futures, wait for both to complete, and return the write ahead log record handle
//两个合起来,第二个值是walResult?
val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
}
//SocketInputDStream
def receive() {
var socket: Socket = null
try {
socket = new Socket(host, port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
...
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
case NonFatal(e) =>
logWarning("Error receiving data", e)
restart("Error receiving data", e)
} finally {
...
}
}
定义
object Future {
def apply[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = impl.Future(body)
}
Trait Promise is an object which can be completed with a value or failed
- with an exception. *
- @define promiseCompletion
- If the promise has already been fulfilled, failed or has timed out,
- calling this method will throw an IllegalStateException. *
- @define allowedThrowables
- If the throwable used to fail this promise is an error, a control exception
- or an interrupted exception, it will be wrapped as a cause within an
ExecutionExceptionwhich will fail the promise. *- @define nonDeterministic
- Note: Using this method may result in non-deterministic concurrent programs.
val s = "Hello"
val f: Future[String] = future {
s + " future!"
}
f onSuccess {
case msg => println(msg)
}
Multiple callbacks may be registered; there is no guarantee that they will be executed in a particular order.
The future may contain a throwable object and this means that the future failed. Futures obtained through combinators have the same exception as the future they were obtained from.[?] The following throwable objects are not contained in the future:
Error- errors are not contained within futuresInterruptedException- not contained within futuresall
scala.util.control.ControlThrowableexceptNonLocalReturnControl- not contained within futuresInstead, the future is completed with a ExecutionException with one of the exceptions above as the cause. If a future is failed with a
scala.runtime.NonLocalReturnControl, it is completed with a value from that throwable instead.
val f = future { 5 }
val g = future { 3 }
val h = for {
x: Int <- f // returns Future(5)
y: Int <- g // returns Future(5)
} yield x + y
is translated to:
f flatMap { (x: Int) => g map { (y: Int) => x + y } }
object ExecutionContext {
def global: ExecutionContextExecutor = Implicits.global
object Implicits {
implicit lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
}
}
We used ExecutionContext.global , which manages a thread pool
using java.util.concurrent.ForkJoinPool , which it uses to perform the work en‐capsulated in theFutures
Samples
def sleep(millis: Long) = {
Thread.sleep(millis)
}
// Busy work ;)
def doWork(index: Int) = {
sleep((math.random * 1000).toLong)
index
}
(1 to 5) foreach { index =>
val future = Future {
doWork(index)
}
future onSuccess {
case answer: Int => println(s"Success! returned: $answer")
}
future onFailure {
case th: Throwable => println(s"FAILURE! returned: $th")
}
onSuccess叫PartialFunction意思是不用全部值都case
val futures = (0 to 9) map {
i => Future {
val s = i.toString
print(s)
s
}
}
val f = Future.reduce(futures)((s1, s2) => s1 + s2) //
val n = Await.result(f, Duration.Inf)
case class ThatsOdd(i: Int) extends RuntimeException(s"odd $i received!")
import scala.util.{Try, Success, Failure}
val doComplete: PartialFunction[Try[String],Unit] = {
case s @ Success(_) => println(s)
case f @ Failure(_) => println(f)
}
val futures = (0 to 9) map {
case i if i % 2 == 0 => Future.successful(i.toString)
case i => Future.failed(ThatsOdd(i))
}
futures map (_ onComplete doComplete)
有一个Option包里的Async Async Doc
A Future has an important property that it may only be assigned once. Once a Future object is given a value or an exception, it becomes in effect immutable– it can never be overwritten.