Future

Future Doc Failed futures store an instance of Throwable instead of the result value.

//ReceivedBlockHandler
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
    //放到blockManager里
    val storeInBlockManagerFuture = Future {
      val putResult =
        blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true)
      if (!putResult.map { _._1 }.contains(blockId)) {
        throw new SparkException(
          s"Could not store $blockId to block manager with storage level $storageLevel")
      }
    }

    //写到wal里
    val storeInWriteAheadLogFuture = Future {
      writeAheadLog.write(serializedBlock, clock.getTimeMillis())
    }
    // Combine the futures, wait for both to complete, and return the write ahead log record handle
    //两个合起来,第二个值是walResult?
    val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
}


//SocketInputDStream
def receive() {
    var socket: Socket = null
    try {
      socket = new Socket(host, port)
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        store(iterator.next)
      }
      ...
    } catch {
      case e: java.net.ConnectException =>
        restart("Error connecting to " + host + ":" + port, e)

      case NonFatal(e) =>
        logWarning("Error receiving data", e)
        restart("Error receiving data", e)

    } finally {
      ...
    }
  }

定义

object Future {

    def apply[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = impl.Future(body)
}

Trait Promise is an object which can be completed with a value or failed

  • with an exception. *
  • @define promiseCompletion
  • If the promise has already been fulfilled, failed or has timed out,
  • calling this method will throw an IllegalStateException. *
  • @define allowedThrowables
  • If the throwable used to fail this promise is an error, a control exception
  • or an interrupted exception, it will be wrapped as a cause within an
  • ExecutionException which will fail the promise. *
  • @define nonDeterministic
  • Note: Using this method may result in non-deterministic concurrent programs.
  val s = "Hello"
  val f: Future[String] = future {
    s + " future!"
  }
  f onSuccess {
    case msg => println(msg)
  }

Multiple callbacks may be registered; there is no guarantee that they will be executed in a particular order.

The future may contain a throwable object and this means that the future failed. Futures obtained through combinators have the same exception as the future they were obtained from.[?] The following throwable objects are not contained in the future:

  • Error - errors are not contained within futures
  • InterruptedException - not contained within futures
  • all scala.util.control.ControlThrowable except NonLocalReturnControl - not contained within futures

    Instead, the future is completed with a ExecutionException with one of the exceptions above as the cause. If a future is failed with a scala.runtime.NonLocalReturnControl, it is completed with a value from that throwable instead.

  val f = future { 5 }
  val g = future { 3 }
  val h = for {
    x: Int <- f // returns Future(5)
    y: Int <- g // returns Future(5)
  } yield x + y

  is translated to:

  f flatMap { (x: Int) => g map { (y: Int) => x + y } }
object ExecutionContext {
    def global: ExecutionContextExecutor = Implicits.global
    object Implicits {
        implicit lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
      }
}

We used ExecutionContext.global , which manages a thread pool using java.util.concurrent.ForkJoinPool , which it uses to perform the work en‐capsulated in theFutures

Samples

def sleep(millis: Long) = {
Thread.sleep(millis)
}
// Busy work ;)
def doWork(index: Int) = {
sleep((math.random * 1000).toLong)
index
}
(1 to 5) foreach { index =>
val future = Future {
doWork(index)
}

future onSuccess {
    case answer: Int => println(s"Success! returned: $answer")
    }
future onFailure {
    case th: Throwable => println(s"FAILURE! returned: $th")
    }

onSuccessPartialFunction意思是不用全部值都case

val futures = (0 to 9) map {
    i => Future {
    val s = i.toString
    print(s)
    s
    }
}
val f = Future.reduce(futures)((s1, s2) => s1 + s2) //
val n = Await.result(f, Duration.Inf)
case class ThatsOdd(i: Int) extends RuntimeException(s"odd $i received!") 
import scala.util.{Try, Success, Failure} 
val doComplete: PartialFunction[Try[String],Unit] = {
    case s @ Success(_) => println(s)
    case f @ Failure(_) => println(f)
}

val futures = (0 to 9) map {
    case i if i % 2 == 0 => Future.successful(i.toString)
    case i => Future.failed(ThatsOdd(i))
    }
    futures map (_ onComplete doComplete)

有一个Option包里的Async Async Doc

A Future has an important property that it may only be assigned once. Once a Future object is given a value or an exception, it becomes in effect immutable– it can never be overwritten.

results matching ""

    No results matching ""